¡@

Home 

OpenStack Study: test_policy.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack Foundation

# Copyright 2013 IBM Corp.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import os.path

import mock

import oslo.config.cfg

import glance.api.policy

from glance.common import exception

import glance.context

from glance.tests.unit import base

import glance.tests.unit.utils as unit_test_utils

from glance.tests import utils as test_utils

UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'

**** CubicPower OpenStack Study ****

class ImageRepoStub(object):

**** CubicPower OpenStack Study ****

    def get(self, *args, **kwargs):

        return 'image_from_get'

**** CubicPower OpenStack Study ****

    def save(self, *args, **kwargs):

        return 'image_from_save'

**** CubicPower OpenStack Study ****

    def add(self, *args, **kwargs):

        return 'image_from_add'

**** CubicPower OpenStack Study ****

    def list(self, *args, **kwargs):

        return ['image_from_list_0', 'image_from_list_1']

**** CubicPower OpenStack Study ****

class ImageStub(object):

**** CubicPower OpenStack Study ****

    def __init__(self, image_id, visibility='private'):

        self.image_id = image_id

        self.visibility = visibility

        self.status = 'active'

**** CubicPower OpenStack Study ****

    def delete(self):

        self.status = 'deleted'

**** CubicPower OpenStack Study ****

class ImageFactoryStub(object):

**** CubicPower OpenStack Study ****

    def new_image(self, image_id=None, name=None, visibility='private',

                  min_disk=0, min_ram=0, protected=False, owner=None,

                  disk_format=None, container_format=None,

                  extra_properties=None, tags=None, **other_args):

        self.visibility = visibility

        return 'new_image'

**** CubicPower OpenStack Study ****

class MemberRepoStub(object):

**** CubicPower OpenStack Study ****

    def add(self, image_member):

        image_member.output = 'member_repo_add'

**** CubicPower OpenStack Study ****

    def get(self, *args, **kwargs):

        return 'member_repo_get'

**** CubicPower OpenStack Study ****

    def save(self, image_member):

        image_member.output = 'member_repo_save'

**** CubicPower OpenStack Study ****

    def list(self, *args, **kwargs):

        return 'member_repo_list'

**** CubicPower OpenStack Study ****

    def remove(self, image_member):

        image_member.output = 'member_repo_remove'

**** CubicPower OpenStack Study ****

class ImageMembershipStub(object):

**** CubicPower OpenStack Study ****

    def __init__(self, output=None):

        self.output = output

**** CubicPower OpenStack Study ****

class TaskRepoStub(object):

**** CubicPower OpenStack Study ****

    def get_task_and_details(self, *args, **kwargs):

        return 'task_from_get', 'task_details_from_get'

**** CubicPower OpenStack Study ****

    def add(self, *args, **kwargs):

        return 'task_from_add'

**** CubicPower OpenStack Study ****

    def list_tasks(self, *args, **kwargs):

        return ['task_from_list_0', 'task_from_list_1']

**** CubicPower OpenStack Study ****

class TaskStub(object):

**** CubicPower OpenStack Study ****

    def __init__(self, task_id):

        self.task_id = task_id

        self.status = 'pending'

**** CubicPower OpenStack Study ****

    def run(self, executor):

        self.status = 'processing'

**** CubicPower OpenStack Study ****

class TaskFactoryStub(object):

**** CubicPower OpenStack Study ****

    def new_task(self, *args):

        return 'new_task'

**** CubicPower OpenStack Study ****

class TestPolicyEnforcer(base.IsolatedUnitTest):

**** CubicPower OpenStack Study ****

    def test_policy_file_default_rules_default_location(self):

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        enforcer.enforce(context, 'get_image', {})

**** CubicPower OpenStack Study ****

    def test_policy_file_custom_rules_default_location(self):

        rules = {"get_image": '!'}

        self.set_policy_rules(rules)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        self.assertRaises(exception.Forbidden,

                          enforcer.enforce, context, 'get_image', {})

**** CubicPower OpenStack Study ****

    def test_policy_file_custom_location(self):

        self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))

        rules = {"get_image": '!'}

        self.set_policy_rules(rules)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        self.assertRaises(exception.Forbidden,

                          enforcer.enforce, context, 'get_image', {})

**** CubicPower OpenStack Study ****

    def test_policy_file_check(self):

        self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))

        rules = {"get_image": '!'}

        self.set_policy_rules(rules)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        self.assertEqual(enforcer.check(context, 'get_image', {}), False)

**** CubicPower OpenStack Study ****

class TestPolicyEnforcerNoFile(base.IsolatedUnitTest):

**** CubicPower OpenStack Study ****

    def test_policy_file_specified_but_not_found(self):

        """Missing defined policy file should result in a default ruleset"""

        self.config(policy_file='gobble.gobble')

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        enforcer.enforce(context, 'get_image', {})

        self.assertRaises(exception.Forbidden,

                          enforcer.enforce, context, 'manage_image_cache', {})

        admin_context = glance.context.RequestContext(roles=['admin'])

        enforcer.enforce(admin_context, 'manage_image_cache', {})

**** CubicPower OpenStack Study ****

    def test_policy_file_default_not_found(self):

        """Missing default policy file should result in a default ruleset"""

        def fake_find_file(self, name):

            return None

        self.stubs.Set(oslo.config.cfg.ConfigOpts, 'find_file',

                       fake_find_file)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        enforcer.enforce(context, 'get_image', {})

        self.assertRaises(exception.Forbidden,

                          enforcer.enforce, context, 'manage_image_cache', {})

        admin_context = glance.context.RequestContext(roles=['admin'])

        enforcer.enforce(admin_context, 'manage_image_cache', {})

**** CubicPower OpenStack Study ****

        def fake_find_file(self, name):

            return None

        self.stubs.Set(oslo.config.cfg.ConfigOpts, 'find_file',

                       fake_find_file)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[])

        enforcer.enforce(context, 'get_image', {})

        self.assertRaises(exception.Forbidden,

                          enforcer.enforce, context, 'manage_image_cache', {})

        admin_context = glance.context.RequestContext(roles=['admin'])

        enforcer.enforce(admin_context, 'manage_image_cache', {})

**** CubicPower OpenStack Study ****

class TestImagePolicy(test_utils.BaseTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.image_stub = ImageStub(UUID1)

        self.image_repo_stub = ImageRepoStub()

        self.image_factory_stub = ImageFactoryStub()

        self.policy = mock.Mock()

        self.policy.enforce = mock.Mock()

        super(TestImagePolicy, self).setUp()

**** CubicPower OpenStack Study ****

    def test_publicize_image_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden,

                          setattr, image, 'visibility', 'public')

        self.assertEqual(image.visibility, 'private')

        self.policy.enforce.assert_called_once_with({}, "publicize_image", {})

**** CubicPower OpenStack Study ****

    def test_publicize_image_allowed(self):

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        image.visibility = 'public'

        self.assertEqual(image.visibility, 'public')

        self.policy.enforce.assert_called_once_with({}, "publicize_image", {})

**** CubicPower OpenStack Study ****

    def test_delete_image_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image.delete)

        self.assertEqual(image.status, 'active')

        self.policy.enforce.assert_called_once_with({}, "delete_image", {})

**** CubicPower OpenStack Study ****

    def test_delete_image_allowed(self):

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        image.delete()

        self.assertEqual(image.status, 'deleted')

        self.policy.enforce.assert_called_once_with({}, "delete_image", {})

**** CubicPower OpenStack Study ****

    def test_get_image_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        self.assertRaises(exception.Forbidden, image_repo.get, UUID1)

        self.policy.enforce.assert_called_once_with({}, "get_image", {})

**** CubicPower OpenStack Study ****

    def test_get_image_allowed(self):

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        output = image_repo.get(UUID1)

        self.assertIsInstance(output, glance.api.policy.ImageProxy)

        self.assertEqual(output.image, 'image_from_get')

        self.policy.enforce.assert_called_once_with({}, "get_image", {})

**** CubicPower OpenStack Study ****

    def test_get_images_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        self.assertRaises(exception.Forbidden, image_repo.list)

        self.policy.enforce.assert_called_once_with({}, "get_images", {})

**** CubicPower OpenStack Study ****

    def test_get_images_allowed(self):

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        images = image_repo.list()

        for i, image in enumerate(images):

            self.assertIsInstance(image, glance.api.policy.ImageProxy)

            self.assertEqual(image.image, 'image_from_list_%d' % i)

            self.policy.enforce.assert_called_once_with({}, "get_images", {})

**** CubicPower OpenStack Study ****

    def test_modify_image_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image_repo.save, image)

        self.policy.enforce.assert_called_once_with({}, "modify_image", {})

**** CubicPower OpenStack Study ****

    def test_modify_image_allowed(self):

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        image_repo.save(image)

        self.policy.enforce.assert_called_once_with({}, "modify_image", {})

**** CubicPower OpenStack Study ****

    def test_add_image_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image_repo.add, image)

        self.policy.enforce.assert_called_once_with({}, "add_image", {})

**** CubicPower OpenStack Study ****

    def test_add_image_allowed(self):

        image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,

                                                      {}, self.policy)

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        image_repo.add(image)

        self.policy.enforce.assert_called_once_with({}, "add_image", {})

**** CubicPower OpenStack Study ****

    def test_new_image_visibility(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image_factory = glance.api.policy.ImageFactoryProxy(

            self.image_factory_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image_factory.new_image,

                          visibility='public')

        self.policy.enforce.assert_called_once_with({}, "publicize_image", {})

**** CubicPower OpenStack Study ****

    def test_new_image_visibility_public_allowed(self):

        image_factory = glance.api.policy.ImageFactoryProxy(

            self.image_factory_stub, {}, self.policy)

        image_factory.new_image(visibility='public')

        self.policy.enforce.assert_called_once_with({}, "publicize_image", {})

**** CubicPower OpenStack Study ****

    def test_image_get_data(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image.get_data)

        self.policy.enforce.assert_called_once_with({}, "download_image", {})

**** CubicPower OpenStack Study ****

    def test_image_set_data(self):

        self.policy.enforce.side_effect = exception.Forbidden

        image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, image.set_data)

        self.policy.enforce.assert_called_once_with({}, "upload_image", {})

**** CubicPower OpenStack Study ****

class TestMemberPolicy(test_utils.BaseTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.policy = mock.Mock()

        self.policy.enforce = mock.Mock()

        self.member_repo = glance.api.policy.ImageMemberRepoProxy(

            MemberRepoStub(), {}, self.policy)

        super(TestMemberPolicy, self).setUp()

**** CubicPower OpenStack Study ****

    def test_add_member_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        self.assertRaises(exception.Forbidden, self.member_repo.add, '')

        self.policy.enforce.assert_called_once_with({}, "add_member", {})

**** CubicPower OpenStack Study ****

    def test_add_member_allowed(self):

        image_member = ImageMembershipStub()

        self.member_repo.add(image_member)

        self.assertEqual(image_member.output, 'member_repo_add')

        self.policy.enforce.assert_called_once_with({}, "add_member", {})

**** CubicPower OpenStack Study ****

    def test_get_member_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        self.assertRaises(exception.Forbidden, self.member_repo.get, '')

        self.policy.enforce.assert_called_once_with({}, "get_member", {})

**** CubicPower OpenStack Study ****

    def test_get_member_allowed(self):

        output = self.member_repo.get('')

        self.assertEqual(output, 'member_repo_get')

        self.policy.enforce.assert_called_once_with({}, "get_member", {})

**** CubicPower OpenStack Study ****

    def test_modify_member_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        self.assertRaises(exception.Forbidden, self.member_repo.save, '')

        self.policy.enforce.assert_called_once_with({}, "modify_member", {})

**** CubicPower OpenStack Study ****

    def test_modify_member_allowed(self):

        image_member = ImageMembershipStub()

        self.member_repo.save(image_member)

        self.assertEqual(image_member.output, 'member_repo_save')

        self.policy.enforce.assert_called_once_with({}, "modify_member", {})

**** CubicPower OpenStack Study ****

    def test_get_members_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        self.assertRaises(exception.Forbidden, self.member_repo.list, '')

        self.policy.enforce.assert_called_once_with({}, "get_members", {})

**** CubicPower OpenStack Study ****

    def test_get_members_allowed(self):

        output = self.member_repo.list('')

        self.assertEqual(output, 'member_repo_list')

        self.policy.enforce.assert_called_once_with({}, "get_members", {})

**** CubicPower OpenStack Study ****

    def test_delete_member_not_allowed(self):

        self.policy.enforce.side_effect = exception.Forbidden

        self.assertRaises(exception.Forbidden, self.member_repo.remove, '')

        self.policy.enforce.assert_called_once_with({}, "delete_member", {})

**** CubicPower OpenStack Study ****

    def test_delete_member_allowed(self):

        image_member = ImageMembershipStub()

        self.member_repo.remove(image_member)

        self.assertEqual(image_member.output, 'member_repo_remove')

        self.policy.enforce.assert_called_once_with({}, "delete_member", {})

**** CubicPower OpenStack Study ****

class TestTaskPolicy(test_utils.BaseTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.task_stub = TaskStub(UUID1)

        self.task_repo_stub = TaskRepoStub()

        self.task_factory_stub = TaskFactoryStub()

        self.policy = unit_test_utils.FakePolicyEnforcer()

        super(TestTaskPolicy, self).setUp()

**** CubicPower OpenStack Study ****

    def test_get_task_not_allowed(self):

        rules = {"get_task": False}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        self.assertRaises(exception.Forbidden,

                          task_repo.get_task_and_details,

                          UUID1)

**** CubicPower OpenStack Study ****

    def test_get_task_allowed(self):

        rules = {"get_task": True}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        task, task_details = task_repo.get_task_and_details(UUID1)

        self.assertIsInstance(task, glance.api.policy.TaskProxy)

        self.assertEqual(task.task, 'task_from_get')

**** CubicPower OpenStack Study ****

    def test_get_tasks_not_allowed(self):

        rules = {"get_tasks": False}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        self.assertRaises(exception.Forbidden, task_repo.list_tasks)

**** CubicPower OpenStack Study ****

    def test_get_tasks_allowed(self):

        rules = {"get_task": True}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        tasks = task_repo.list_tasks()

        for i, task in enumerate(tasks):

            self.assertIsInstance(task, glance.api.policy.TaskProxy)

            self.assertEqual(task.task, 'task_from_list_%d' % i)

**** CubicPower OpenStack Study ****

    def test_add_task_not_allowed(self):

        rules = {"add_task": False}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        task = glance.api.policy.TaskProxy(self.task_stub, {}, self.policy)

        self.assertRaises(exception.Forbidden, task_repo.add, task)

**** CubicPower OpenStack Study ****

    def test_add_task_allowed(self):

        rules = {"add_task": True}

        self.policy.set_rules(rules)

        task_repo = glance.api.policy.TaskRepoProxy(

            self.task_repo_stub,

            {},

            self.policy

        )

        task = glance.api.policy.TaskProxy(self.task_stub, {}, self.policy)

        task_repo.add(task)

**** CubicPower OpenStack Study ****

class TestContextPolicyEnforcer(base.IsolatedUnitTest):

**** CubicPower OpenStack Study ****

    def _do_test_policy_influence_context_admin(self,

                                                policy_admin_role,

                                                context_role,

                                                context_is_admin,

                                                admin_expected):

        self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))

        rules = {'context_is_admin': 'role:%s' % policy_admin_role}

        self.set_policy_rules(rules)

        enforcer = glance.api.policy.Enforcer()

        context = glance.context.RequestContext(roles=[context_role],

                                                is_admin=context_is_admin,

                                                policy_enforcer=enforcer)

        self.assertEqual(context.is_admin, admin_expected)

**** CubicPower OpenStack Study ****

    def test_context_admin_policy_admin(self):

        self._do_test_policy_influence_context_admin('test_admin',

                                                     'test_admin',

                                                     True,

                                                     True)

**** CubicPower OpenStack Study ****

    def test_context_nonadmin_policy_admin(self):

        self._do_test_policy_influence_context_admin('test_admin',

                                                     'test_admin',

                                                     False,

                                                     True)

**** CubicPower OpenStack Study ****

    def test_context_admin_policy_nonadmin(self):

        self._do_test_policy_influence_context_admin('test_admin',

                                                     'demo',

                                                     True,

                                                     True)

**** CubicPower OpenStack Study ****

    def test_context_nonadmin_policy_nonadmin(self):

        self._do_test_policy_influence_context_admin('test_admin',

                                                     'demo',

                                                     False,

                                                     False)