¡@

Home 

OpenStack Study: test_v3_filters.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack LLC

# Copyright 2013 IBM Corp.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import tempfile

import uuid

from keystone import config

from keystone.openstack.common import jsonutils

from keystone.policy.backends import rules

from keystone.tests import filtering

from keystone.tests import test_v3

CONF = config.CONF

**** CubicPower OpenStack Study ****

class IdentityTestFilteredCase(filtering.FilterTests, test_v3.RestfulTestCase):

"""Test filter enforcement on the v3 Identity API."""

content_type = 'json'

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Setup for Identity Filter Test Cases."""

        super(IdentityTestFilteredCase, self).setUp()

        # Initialize the policy engine and allow us to write to a temp

        # file in each test to create the policies

        self.orig_policy_file = CONF.policy_file

        rules.reset()

        _unused, self.tmpfilename = tempfile.mkstemp()

        self.config_fixture.config(policy_file=self.tmpfilename)

        #drop the policy rules

        self.addCleanup(rules.reset)

**** CubicPower OpenStack Study ****

    def load_sample_data(self):

        """Create sample data for these tests.

        As well as the usual housekeeping, create a set of domains,

        users, roles and projects for the subsequent tests:

        - Three domains: A,B & C.  C is disabled.

        - DomainA has user1, DomainB has user2 and user3

        - DomainA has group1 and group2, DomainB has group3

        - User1 has a role on DomainA

        Remember that there will also be a fourth domain in existence,

        the default domain.

        """

        # Start by creating a few domains

        self.domainA = self.new_domain_ref()

        self.assignment_api.create_domain(self.domainA['id'], self.domainA)

        self.domainB = self.new_domain_ref()

        self.assignment_api.create_domain(self.domainB['id'], self.domainB)

        self.domainC = self.new_domain_ref()

        self.domainC['enabled'] = False

        self.assignment_api.create_domain(self.domainC['id'], self.domainC)

        # Now create some users, one in domainA and two of them in domainB

        self.user1 = self.new_user_ref(domain_id=self.domainA['id'])

        self.user1['password'] = uuid.uuid4().hex

        self.identity_api.create_user(self.user1['id'], self.user1)

        self.user2 = self.new_user_ref(domain_id=self.domainB['id'])

        self.user2['password'] = uuid.uuid4().hex

        self.identity_api.create_user(self.user2['id'], self.user2)

        self.user3 = self.new_user_ref(domain_id=self.domainB['id'])

        self.user3['password'] = uuid.uuid4().hex

        self.identity_api.create_user(self.user3['id'], self.user3)

        self.role = self.new_role_ref()

        self.assignment_api.create_role(self.role['id'], self.role)

        self.assignment_api.create_grant(self.role['id'],

                                         user_id=self.user1['id'],

                                         domain_id=self.domainA['id'])

        # A default auth request we can use - un-scoped user token

        self.auth = self.build_authentication_request(

            user_id=self.user1['id'],

            password=self.user1['password'])

**** CubicPower OpenStack Study ****

    def _get_id_list_from_ref_list(self, ref_list):

        result_list = []

        for x in ref_list:

            result_list.append(x['id'])

        return result_list

**** CubicPower OpenStack Study ****

    def _set_policy(self, new_policy):

        with open(self.tmpfilename, "w") as policyfile:

            policyfile.write(jsonutils.dumps(new_policy))

**** CubicPower OpenStack Study ****

    def test_list_users_filtered_by_domain(self):

        """GET /users?domain_id=mydomain (filtered)

        Test Plan:

        - Update policy so api is unprotected

        - Use an un-scoped token to make sure we can filter the

          users by domainB, getting back the 2 users in that domain

        """

        self._set_policy({"identity:list_users": []})

        url_by_name = '/users?domain_id=%s' % self.domainB['id']

        r = self.get(url_by_name, auth=self.auth)

        # We should  get back two users, those in DomainB

        id_list = self._get_id_list_from_ref_list(r.result.get('users'))

        self.assertIn(self.user2['id'], id_list)

        self.assertIn(self.user3['id'], id_list)

**** CubicPower OpenStack Study ****

    def test_list_filtered_domains(self):

        """GET /domains?enabled=0

        Test Plan:

        - Update policy for no protection on api

        - Filter by the 'enabled' boolean to get disabled domains, which

          should return just domainC

        - Try the filter using different ways of specifying True/False

          to test that our handling of booleans in filter matching is

          correct

        """

        new_policy = {"identity:list_domains": []}

        self._set_policy(new_policy)

        r = self.get('/domains?enabled=0', auth=self.auth)

        id_list = self._get_id_list_from_ref_list(r.result.get('domains'))

        self.assertEqual(len(id_list), 1)

        self.assertIn(self.domainC['id'], id_list)

        # Try a few ways of specifying 'false'

        for val in ('0', 'false', 'False', 'FALSE', 'n', 'no', 'off'):

            r = self.get('/domains?enabled=%s' % val, auth=self.auth)

            id_list = self._get_id_list_from_ref_list(r.result.get('domains'))

            self.assertEqual([self.domainC['id']], id_list)

        # Now try a few ways of specifying 'true' when we should get back

        # the other two domains, plus the default domain

        for val in ('1', 'true', 'True', 'TRUE', 'y', 'yes', 'on'):

            r = self.get('/domains?enabled=%s' % val, auth=self.auth)

            id_list = self._get_id_list_from_ref_list(r.result.get('domains'))

            self.assertEqual(len(id_list), 3)

            self.assertIn(self.domainA['id'], id_list)

            self.assertIn(self.domainB['id'], id_list)

            self.assertIn(CONF.identity.default_domain_id, id_list)

        r = self.get('/domains?enabled', auth=self.auth)

        id_list = self._get_id_list_from_ref_list(r.result.get('domains'))

        self.assertEqual(len(id_list), 3)

        self.assertIn(self.domainA['id'], id_list)

        self.assertIn(self.domainB['id'], id_list)

        self.assertIn(CONF.identity.default_domain_id, id_list)

**** CubicPower OpenStack Study ****

    def test_multiple_filters(self):

        """GET /domains?enabled&name=myname

        Test Plan:

        - Update policy for no protection on api

        - Filter by the 'enabled' boolean and name - this should

          return a single domain

        """

        new_policy = {"identity:list_domains": []}

        self._set_policy(new_policy)

        my_url = '/domains?enableds&name=%s' % self.domainA['name']

        r = self.get(my_url, auth=self.auth)

        id_list = self._get_id_list_from_ref_list(r.result.get('domains'))

        self.assertEqual(len(id_list), 1)

        self.assertIn(self.domainA['id'], id_list)

**** CubicPower OpenStack Study ****

    def test_list_users_filtered_by_funny_name(self):

        """GET /users?name=%myname%

        Test Plan:

        - Update policy so api is unprotected

        - Update a user with name that has filter escape characters

        - Ensure we can filter on it

        """

        self._set_policy({"identity:list_users": []})

        user = self.user1

        user['name'] = '%my%name%'

        self.identity_api.update_user(user['id'], user)

        url_by_name = '/users?name=%my%name%'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 1)

        self.assertEqual(r.result.get('users')[0]['id'], user['id'])

**** CubicPower OpenStack Study ****

    def test_inexact_filters(self):

        # Create 20 users

        user_list = self._create_test_data('user', 20)

        # Set up some names that we can filter on

        user = user_list[5]

        user['name'] = 'The'

        self.identity_api.update_user(user['id'], user)

        user = user_list[6]

        user['name'] = 'The Ministry'

        self.identity_api.update_user(user['id'], user)

        user = user_list[7]

        user['name'] = 'The Ministry of'

        self.identity_api.update_user(user['id'], user)

        user = user_list[8]

        user['name'] = 'The Ministry of Silly'

        self.identity_api.update_user(user['id'], user)

        user = user_list[9]

        user['name'] = 'The Ministry of Silly Walks'

        self.identity_api.update_user(user['id'], user)

        # ...and one for useful case insensitivity testing

        user = user_list[10]

        user['name'] = 'the ministry of silly walks OF'

        self.identity_api.update_user(user['id'], user)

        self._set_policy({"identity:list_users": []})

        url_by_name = '/users?name__contains=Ministry'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 4)

        self._match_with_list(r.result.get('users'), user_list,

                              list_start=6, list_end=10)

        url_by_name = '/users?name__icontains=miNIstry'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 5)

        self._match_with_list(r.result.get('users'), user_list,

                              list_start=6, list_end=11)

        url_by_name = '/users?name__startswith=The'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 5)

        self._match_with_list(r.result.get('users'), user_list,

                              list_start=5, list_end=10)

        url_by_name = '/users?name__istartswith=the'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 6)

        self._match_with_list(r.result.get('users'), user_list,

                              list_start=5, list_end=11)

        url_by_name = '/users?name__endswith=of'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 1)

        self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])

        url_by_name = '/users?name__iendswith=OF'

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 2)

        self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])

        self.assertEqual(r.result.get('users')[1]['id'], user_list[10]['id'])

        self._delete_test_data('user', user_list)

**** CubicPower OpenStack Study ****

    def test_filter_sql_injection_attack(self):

        """GET /users?name=

        Test Plan:

        - Attempt to get all entities back by passing a two-term attribute

        - Attempt to piggyback filter to damage DB (e.g. drop table)

        """

        self._set_policy({"identity:list_users": [],

                          "identity:list_groups": [],

                          "identity:create_group": []})

        url_by_name = "/users?name=anything' or 'x'='x"

        r = self.get(url_by_name, auth=self.auth)

        self.assertEqual(len(r.result.get('users')), 0)

        # See if we can add a SQL command...use the group table instead of the

        # user table since 'user' is reserved word for SQLAlchemy.

        group = self.new_group_ref(domain_id=self.domainB['id'])

        self.identity_api.create_group(group['id'], group)

        url_by_name = "/users?name=x'; drop table group"

        r = self.get(url_by_name, auth=self.auth)

        # Check group table is still there...

        url_by_name = "/groups"

        r = self.get(url_by_name, auth=self.auth)

        self.assertTrue(len(r.result.get('groups')) > 0)

**** CubicPower OpenStack Study ****

class IdentityTestListLimitCase(IdentityTestFilteredCase):

"""Test list limiting enforcement on the v3 Identity API."""

content_type = 'json'

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Setup for Identity Limit Test Cases."""

        super(IdentityTestListLimitCase, self).setUp()

        self._set_policy({"identity:list_users": [],

                          "identity:list_groups": [],

                          "identity:list_projects": [],

                          "identity:list_services": [],

                          "identity:list_policies": []})

        # Create 10 entries for each of the entities we are going to test

        self.ENTITY_TYPES = ['user', 'group', 'project']

        self.entity_lists = {}

        for entity in self.ENTITY_TYPES:

            self.entity_lists[entity] = self._create_test_data(entity, 10)

            # Make sure we clean up when finished

            self.addCleanup(self.clean_up_entity, entity)

        self.service_list = []

        self.addCleanup(self.clean_up_service)

        for _ in range(10):

            new_entity = {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex}

            service = self.catalog_api.create_service(new_entity['id'],

                                                      new_entity)

            self.service_list.append(service)

        self.policy_list = []

        self.addCleanup(self.clean_up_policy)

        for _ in range(10):

            new_entity = {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex,

                          'blob': uuid.uuid4().hex}

            policy = self.policy_api.create_policy(new_entity['id'],

                                                   new_entity)

            self.policy_list.append(policy)

**** CubicPower OpenStack Study ****

    def clean_up_entity(self, entity):

        """Clean up entity test data from Identity Limit Test Cases."""

        self._delete_test_data(entity, self.entity_lists[entity])

**** CubicPower OpenStack Study ****

    def clean_up_service(self):

        """Clean up service test data from Identity Limit Test Cases."""

        for service in self.service_list:

            self.catalog_api.delete_service(service['id'])

**** CubicPower OpenStack Study ****

    def clean_up_policy(self):

        """Clean up policy test data from Identity Limit Test Cases."""

        for policy in self.policy_list:

            self.policy_api.delete_policy(policy['id'])

**** CubicPower OpenStack Study ****

    def _test_entity_list_limit(self, entity, driver):

        """GET / (limited)

        Test Plan:

        - For the specified type of entity:

            - Update policy for no protection on api

            - Add a bunch of entities

            - Set the global list limit to 5, and check that getting all

            - entities only returns 5

            - Set the driver list_limit to 4, and check that now only 4 are

            - returned

        """

        if entity == 'policy':

            plural = 'policies'

        else:

            plural = '%ss' % entity

        self.config_fixture.config(list_limit=5)

        self.config_fixture.config(group=driver, list_limit=None)

        r = self.get('/%s' % plural, auth=self.auth)

        self.assertEqual(len(r.result.get(plural)), 5)

        self.assertIs(r.result.get('truncated'), True)

        self.config_fixture.config(group=driver, list_limit=4)

        r = self.get('/%s' % plural, auth=self.auth)

        self.assertEqual(len(r.result.get(plural)), 4)

        self.assertIs(r.result.get('truncated'), True)

**** CubicPower OpenStack Study ****

    def test_users_list_limit(self):

        self._test_entity_list_limit('user', 'identity')

**** CubicPower OpenStack Study ****

    def test_groups_list_limit(self):

        self._test_entity_list_limit('group', 'identity')

**** CubicPower OpenStack Study ****

    def test_projects_list_limit(self):

        self._test_entity_list_limit('project', 'assignment')

**** CubicPower OpenStack Study ****

    def test_services_list_limit(self):

        self._test_entity_list_limit('service', 'catalog')

**** CubicPower OpenStack Study ****

    def test_non_driver_list_limit(self):

        """Check list can be limited without driver level support.

        Policy limiting is not done at the driver level (since it

        really isn't worth doing it there).  So use this as a test

        for ensuring the controller level will successfully limit

        in this case.

        """

        self._test_entity_list_limit('policy', 'policy')

**** CubicPower OpenStack Study ****

    def test_no_limit(self):

        """Check truncated attribute not set when list not limited."""

        r = self.get('/services', auth=self.auth)

        self.assertEqual(len(r.result.get('services')), 10)

        self.assertIsNone(r.result.get('truncated'))

**** CubicPower OpenStack Study ****

    def test_at_limit(self):

        """Check truncated attribute not set when list at max size."""

        # Test this by overriding the general limit with a higher

        # driver-specific limit (allowing all entities to be returned

        # in the collection), which should result in a non truncated list

        self.config_fixture.config(list_limit=5)

        self.config_fixture.config(group='catalog', list_limit=10)

        r = self.get('/services', auth=self.auth)

        self.assertEqual(len(r.result.get('services')), 10)

        self.assertIsNone(r.result.get('truncated'))

**** CubicPower OpenStack Study ****

class IdentityTestFilteredCaseXML(IdentityTestFilteredCase):

content_type = 'xml'

**** CubicPower OpenStack Study ****

class IdentityTestListLimitCaseXML(IdentityTestListLimitCase):

content_type = 'xml'