¡@

Home 

OpenStack Study: test_sql_upgrade.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""

To run these tests against a live database:

1. Modify the file ``keystone/tests/backend_sql.conf`` to use the connection

for your live database

2. Set up a blank, live database

3. Run the tests using::

tox keystone.tests.test_sql_upgrade

WARNING::

Your database will be wiped.

Do not do this against a database with valuable data as all data will be

lost.

"""

import copy

import json

import uuid

from migrate.versioning import api as versioning_api

import sqlalchemy

import sqlalchemy.exc

from keystone.assignment.backends import sql as assignment_sql

from keystone.common import sql

from keystone.common.sql import migration_helpers

from keystone.common import utils

from keystone import config

from keystone.contrib import federation

from keystone import credential

from keystone import exception

from keystone.openstack.common.db import exception as db_exception

from keystone.openstack.common.db.sqlalchemy import migration

from keystone.openstack.common.db.sqlalchemy import session as db_session

from keystone import tests

from keystone.tests import default_fixtures

CONF = config.CONF

DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id

**** CubicPower OpenStack Study ****

class SqlMigrateBase(tests.SQLDriverOverrides, tests.TestCase):

**** CubicPower OpenStack Study ****

    def initialize_sql(self):

        self.metadata = sqlalchemy.MetaData()

        self.metadata.bind = self.engine

**** CubicPower OpenStack Study ****

    def config_files(self):

        config_files = super(SqlMigrateBase, self).config_files()

        config_files.append(tests.dirs.tests_conf('backend_sql.conf'))

        return config_files

**** CubicPower OpenStack Study ****

    def repo_package(self):

        return sql

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(SqlMigrateBase, self).setUp()

        self.config(self.config_files())

        conn_str = CONF.database.connection

        if (conn_str.startswith('sqlite') and

                conn_str[10:] == tests.DEFAULT_TEST_DB_FILE):

            # Override the default with a DB that is specific to the migration

            # tests only if the DB Connection string is the same as the global

            # default. This is required so that no conflicts occur due to the

            # global default DB already being under migrate control.

            db_file = tests.dirs.tmp('keystone_migrate_test.db')

            self.config_fixture.config(

                group='database',

                connection='sqlite:///%s' % db_file)

        # create and share a single sqlalchemy engine for testing

        self.engine = sql.get_engine()

        self.Session = db_session.get_maker(self.engine, autocommit=False)

        self.initialize_sql()

        self.repo_path = migration_helpers.find_migrate_repo(

            self.repo_package())

        self.schema = versioning_api.ControlledSchema.create(

            self.engine,

            self.repo_path, 0)

        # auto-detect the highest available schema version in the migrate_repo

        self.max_version = self.schema.repository.version().version

**** CubicPower OpenStack Study ****

    def tearDown(self):

        sqlalchemy.orm.session.Session.close_all()

        table = sqlalchemy.Table("migrate_version", self.metadata,

                                 autoload=True)

        self.downgrade(0)

        table.drop(self.engine, checkfirst=True)

        sql.cleanup()

        super(SqlMigrateBase, self).tearDown()

**** CubicPower OpenStack Study ****

    def select_table(self, name):

        table = sqlalchemy.Table(name,

                                 self.metadata,

                                 autoload=True)

        s = sqlalchemy.select([table])

        return s

**** CubicPower OpenStack Study ****

    def assertTableExists(self, table_name):

        try:

            self.select_table(table_name)

        except sqlalchemy.exc.NoSuchTableError:

            raise AssertionError('Table "%s" does not exist' % table_name)

**** CubicPower OpenStack Study ****

    def assertTableDoesNotExist(self, table_name):

        """Asserts that a given table exists cannot be selected by name."""

        # Switch to a different metadata otherwise you might still

        # detect renamed or dropped tables

        try:

            temp_metadata = sqlalchemy.MetaData()

            temp_metadata.bind = self.engine

            sqlalchemy.Table(table_name, temp_metadata, autoload=True)

        except sqlalchemy.exc.NoSuchTableError:

            pass

        else:

            raise AssertionError('Table "%s" already exists' % table_name)

**** CubicPower OpenStack Study ****

    def upgrade(self, *args, **kwargs):

        self._migrate(*args, **kwargs)

**** CubicPower OpenStack Study ****

    def downgrade(self, *args, **kwargs):

        self._migrate(*args, downgrade=True, **kwargs)

**** CubicPower OpenStack Study ****

    def _migrate(self, version, repository=None, downgrade=False,

                 current_schema=None):

        repository = repository or self.repo_path

        err = ''

        version = versioning_api._migrate_version(self.schema,

                                                  version,

                                                  not downgrade,

                                                  err)

        if not current_schema:

            current_schema = self.schema

        changeset = current_schema.changeset(version)

        for ver, change in changeset:

            self.schema.runchange(ver, change, changeset.step)

        self.assertEqual(self.schema.version, version)

**** CubicPower OpenStack Study ****

    def assertTableColumns(self, table_name, expected_cols):

        """Asserts that the table contains the expected set of columns."""

        self.initialize_sql()

        table = self.select_table(table_name)

        actual_cols = [col.name for col in table.columns]

        self.assertEqual(expected_cols, actual_cols, '%s table' % table_name)

**** CubicPower OpenStack Study ****

class SqlUpgradeTests(SqlMigrateBase):

**** CubicPower OpenStack Study ****

    def test_blank_db_to_start(self):

        self.assertTableDoesNotExist('user')

**** CubicPower OpenStack Study ****

    def test_start_version_0(self):

        version = migration.db_version(sql.get_engine(), self.repo_path, 0)

        self.assertEqual(version, 0, "DB is not at version 0")

**** CubicPower OpenStack Study ****

    def test_two_steps_forward_one_step_back(self):

        """You should be able to cleanly undo and re-apply all upgrades.

        Upgrades are run in the following order::

            0 -> 1 -> 0 -> 1 -> 2 -> 1 -> 2 -> 3 -> 2 -> 3 ...

                 ^---------^    ^---------^    ^---------^

        """

        for x in range(1, self.max_version + 1):

            self.upgrade(x)

            self.downgrade(x - 1)

            self.upgrade(x)

**** CubicPower OpenStack Study ****

    def test_upgrade_add_initial_tables(self):

        self.upgrade(1)

        self.assertTableColumns("user", ["id", "name", "extra"])

        self.assertTableColumns("tenant", ["id", "name", "extra"])

        self.assertTableColumns("role", ["id", "name"])

        self.assertTableColumns("user_tenant_membership",

                                ["user_id", "tenant_id"])

        self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])

        self.populate_user_table()

**** CubicPower OpenStack Study ****

    def test_upgrade_add_policy(self):

        self.upgrade(5)

        self.assertTableDoesNotExist('policy')

        self.upgrade(6)

        self.assertTableExists('policy')

        self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra'])

**** CubicPower OpenStack Study ****

    def test_upgrade_normalize_identity(self):

        self.upgrade(8)

        self.populate_user_table()

        self.populate_tenant_table()

        self.upgrade(10)

        self.assertTableColumns("user",

                                ["id", "name", "extra",

                                 "password", "enabled"])

        self.assertTableColumns("tenant",

                                ["id", "name", "extra", "description",

                                 "enabled"])

        self.assertTableColumns("role", ["id", "name", "extra"])

        self.assertTableColumns("user_tenant_membership",

                                ["user_id", "tenant_id"])

        self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])

        session = self.Session()

        user_table = sqlalchemy.Table("user",

                                      self.metadata,

                                      autoload=True)

        a_user = session.query(user_table).filter("id='foo'").one()

        self.assertTrue(a_user.enabled)

        a_user = session.query(user_table).filter("id='badguy'").one()

        self.assertFalse(a_user.enabled)

        tenant_table = sqlalchemy.Table("tenant",

                                        self.metadata,

                                        autoload=True)

        a_tenant = session.query(tenant_table).filter("id='baz'").one()

        self.assertEqual(a_tenant.description, 'description')

        session.commit()

        session.close()

**** CubicPower OpenStack Study ****

    def test_upgrade_user_tenant_membership_to_metadata(self):

        self.upgrade(16)

        self.assertTableColumns(

            'user_project_membership',

            ['user_id', 'tenant_id'])

        user = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex,

            'domain_id': 'default',

            'extra': json.dumps({}),

        }

        project = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex,

            'domain_id': 'default',

            'extra': json.dumps({}),

        }

        metadata = {

            'user_id': user['id'],

            'tenant_id': project['id'],

        }

        session = self.Session()

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'project', project)

        self.insert_dict(session, 'user_project_membership', metadata)

        self.upgrade(17)

        user_project_metadata_table = sqlalchemy.Table(

            'user_project_metadata', self.metadata, autoload=True)

        result = session.query(user_project_metadata_table).one()

        self.assertEqual(result.user_id, user['id'])

        self.assertEqual(result.project_id, project['id'])

        self.assertEqual(

            json.loads(result.data),

            {'roles': [CONF.member_role_id]})

**** CubicPower OpenStack Study ****

    def test_normalized_enabled_states(self):

        self.upgrade(8)

        users = {

            'bool_enabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': True})},

            'bool_disabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': False})},

            'str_enabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': 'True'})},

            'str_disabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': 'False'})},

            'int_enabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': 1})},

            'int_disabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': 0})},

            'null_enabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({'enabled': None})},

            'unset_enabled_user': {

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'password': uuid.uuid4().hex,

                'extra': json.dumps({})}}

        session = self.Session()

        for user in users.values():

            self.insert_dict(session, 'user', user)

        session.commit()

        self.upgrade(10)

        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

        q = session.query(user_table, 'enabled')

        user = q.filter_by(id=users['bool_enabled_user']['id']).one()

        self.assertTrue(user.enabled)

        user = q.filter_by(id=users['bool_disabled_user']['id']).one()

        self.assertFalse(user.enabled)

        user = q.filter_by(id=users['str_enabled_user']['id']).one()

        self.assertTrue(user.enabled)

        user = q.filter_by(id=users['str_disabled_user']['id']).one()

        self.assertFalse(user.enabled)

        user = q.filter_by(id=users['int_enabled_user']['id']).one()

        self.assertTrue(user.enabled)

        user = q.filter_by(id=users['int_disabled_user']['id']).one()

        self.assertFalse(user.enabled)

        user = q.filter_by(id=users['null_enabled_user']['id']).one()

        self.assertTrue(user.enabled)

        user = q.filter_by(id=users['unset_enabled_user']['id']).one()

        self.assertTrue(user.enabled)

**** CubicPower OpenStack Study ****

    def test_downgrade_10_to_8(self):

        self.upgrade(10)

        self.populate_user_table(with_pass_enab=True)

        self.populate_tenant_table(with_desc_enab=True)

        self.downgrade(8)

        self.assertTableColumns('user',

                                ['id', 'name', 'extra'])

        self.assertTableColumns('tenant',

                                ['id', 'name', 'extra'])

        session = self.Session()

        user_table = sqlalchemy.Table("user",

                                      self.metadata,

                                      autoload=True)

        a_user = session.query(user_table).filter("id='badguy'").one()

        self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])

        tenant_table = sqlalchemy.Table("tenant",

                                        self.metadata,

                                        autoload=True)

        a_tenant = session.query(tenant_table).filter("id='baz'").one()

        self.assertEqual(a_tenant.name, default_fixtures.TENANTS[1]['name'])

        session.commit()

        session.close()

**** CubicPower OpenStack Study ****

    def test_upgrade_endpoints(self):

        self.upgrade(10)

        service_extra = {

            'name': uuid.uuid4().hex,

        }

        service = {

            'id': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

            'extra': json.dumps(service_extra),

        }

        endpoint_extra = {

            'publicurl': uuid.uuid4().hex,

            'internalurl': uuid.uuid4().hex,

            'adminurl': uuid.uuid4().hex,

        }

        endpoint = {

            'id': uuid.uuid4().hex,

            'region': uuid.uuid4().hex,

            'service_id': service['id'],

            'extra': json.dumps(endpoint_extra),

        }

        session = self.Session()

        self.insert_dict(session, 'service', service)

        self.insert_dict(session, 'endpoint', endpoint)

        session.commit()

        session.close()

        self.upgrade(13)

        self.assertTableColumns(

            'service',

            ['id', 'type', 'extra'])

        self.assertTableColumns(

            'endpoint',

            ['id', 'legacy_endpoint_id', 'interface', 'region', 'service_id',

             'url', 'extra'])

        endpoint_table = sqlalchemy.Table(

            'endpoint', self.metadata, autoload=True)

        session = self.Session()

        self.assertEqual(session.query(endpoint_table).count(), 3)

        for interface in ['public', 'internal', 'admin']:

            q = session.query(endpoint_table)

            q = q.filter_by(legacy_endpoint_id=endpoint['id'])

            q = q.filter_by(interface=interface)

            ref = q.one()

            self.assertNotEqual(ref.id, endpoint['id'])

            self.assertEqual(ref.legacy_endpoint_id, endpoint['id'])

            self.assertEqual(ref.interface, interface)

            self.assertEqual(ref.region, endpoint['region'])

            self.assertEqual(ref.service_id, endpoint['service_id'])

            self.assertEqual(ref.url, endpoint_extra['%surl' % interface])

            self.assertEqual(ref.extra, '{}')

        session.commit()

        session.close()

**** CubicPower OpenStack Study ****

    def assertTenantTables(self):

        self.assertTableExists('tenant')

        self.assertTableExists('user_tenant_membership')

        self.assertTableDoesNotExist('project')

        self.assertTableDoesNotExist('user_project_membership')

**** CubicPower OpenStack Study ****

    def assertProjectTables(self):

        self.assertTableExists('project')

        self.assertTableExists('user_project_membership')

        self.assertTableDoesNotExist('tenant')

        self.assertTableDoesNotExist('user_tenant_membership')

**** CubicPower OpenStack Study ****

    def test_upgrade_tenant_to_project(self):

        self.upgrade(14)

        self.assertTenantTables()

        self.upgrade(15)

        self.assertProjectTables()

**** CubicPower OpenStack Study ****

    def test_downgrade_project_to_tenant(self):

        # TODO(henry-nash): Debug why we need to re-load the tenant

        # or user_tenant_membership ahead of upgrading to project

        # in order for the assertProjectTables to work on sqlite

        # (MySQL is fine without it)

        self.upgrade(14)

        self.assertTenantTables()

        self.upgrade(15)

        self.assertProjectTables()

        self.downgrade(14)

        self.assertTenantTables()

**** CubicPower OpenStack Study ****

    def test_upgrade_add_group_tables(self):

        self.upgrade(13)

        self.upgrade(14)

        self.assertTableExists('group')

        self.assertTableExists('group_project_metadata')

        self.assertTableExists('group_domain_metadata')

        self.assertTableExists('user_group_membership')

**** CubicPower OpenStack Study ****

    def test_upgrade_14_to_16(self):

        self.upgrade(14)

        self.populate_user_table(with_pass_enab=True)

        self.populate_tenant_table(with_desc_enab=True)

        self.upgrade(16)

        self.assertTableColumns("user",

                                ["id", "name", "extra",

                                 "password", "enabled", "domain_id"])

        session = self.Session()

        user_table = sqlalchemy.Table("user",

                                      self.metadata,

                                      autoload=True)

        a_user = session.query(user_table).filter("id='foo'").one()

        self.assertTrue(a_user.enabled)

        self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)

        a_user = session.query(user_table).filter("id='badguy'").one()

        self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])

        self.assertEqual(a_user.domain_id, DEFAULT_DOMAIN_ID)

        project_table = sqlalchemy.Table("project",

                                         self.metadata,

                                         autoload=True)

        a_project = session.query(project_table).filter("id='baz'").one()

        self.assertEqual(a_project.description,

                         default_fixtures.TENANTS[1]['description'])

        self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID)

        session.commit()

        session.close()

        self.check_uniqueness_constraints()

**** CubicPower OpenStack Study ****

    def test_downgrade_16_to_14(self):

        self.upgrade(16)

        self.populate_user_table(with_pass_enab_domain=True)

        self.populate_tenant_table(with_desc_enab_domain=True)

        self.downgrade(14)

        self.assertTableColumns("user",

                                ["id", "name", "extra",

                                 "password", "enabled"])

        session = self.Session()

        user_table = sqlalchemy.Table("user",

                                      self.metadata,

                                      autoload=True)

        a_user = session.query(user_table).filter("id='foo'").one()

        self.assertTrue(a_user.enabled)

        a_user = session.query(user_table).filter("id='badguy'").one()

        self.assertEqual(a_user.name, default_fixtures.USERS[2]['name'])

        tenant_table = sqlalchemy.Table("tenant",

                                        self.metadata,

                                        autoload=True)

        a_tenant = session.query(tenant_table).filter("id='baz'").one()

        self.assertEqual(a_tenant.description,

                         default_fixtures.TENANTS[1]['description'])

        session.commit()

        session.close()

**** CubicPower OpenStack Study ****

    def test_downgrade_remove_group_tables(self):

        self.upgrade(14)

        self.downgrade(13)

        self.assertTableDoesNotExist('group')

        self.assertTableDoesNotExist('group_project_metadata')

        self.assertTableDoesNotExist('group_domain_metadata')

        self.assertTableDoesNotExist('user_group_membership')

**** CubicPower OpenStack Study ****

    def test_downgrade_endpoints(self):

        self.upgrade(13)

        service_extra = {

            'name': uuid.uuid4().hex,

        }

        service = {

            'id': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

            'extra': json.dumps(service_extra),

        }

        common_endpoint_attrs = {

            'legacy_endpoint_id': uuid.uuid4().hex,

            'region': uuid.uuid4().hex,

            'service_id': service['id'],

            'extra': json.dumps({}),

        }

        endpoints = {

            'public': {

                'id': uuid.uuid4().hex,

                'interface': 'public',

                'url': uuid.uuid4().hex,

            },

            'internal': {

                'id': uuid.uuid4().hex,

                'interface': 'internal',

                'url': uuid.uuid4().hex,

            },

            'admin': {

                'id': uuid.uuid4().hex,

                'interface': 'admin',

                'url': uuid.uuid4().hex,

            },

        }

        session = self.Session()

        self.insert_dict(session, 'service', service)

        for endpoint in endpoints.values():

            endpoint.update(common_endpoint_attrs)

            self.insert_dict(session, 'endpoint', endpoint)

        session.commit()

        session.close()

        self.downgrade(9)

        self.assertTableColumns(

            'service',

            ['id', 'type', 'extra'])

        self.assertTableColumns(

            'endpoint',

            ['id', 'region', 'service_id', 'extra'])

        endpoint_table = sqlalchemy.Table(

            'endpoint', self.metadata, autoload=True)

        session = self.Session()

        self.assertEqual(session.query(endpoint_table).count(), 1)

        q = session.query(endpoint_table)

        q = q.filter_by(id=common_endpoint_attrs['legacy_endpoint_id'])

        ref = q.one()

        self.assertEqual(ref.id, common_endpoint_attrs['legacy_endpoint_id'])

        self.assertEqual(ref.region, endpoint['region'])

        self.assertEqual(ref.service_id, endpoint['service_id'])

        extra = json.loads(ref.extra)

        for interface in ['public', 'internal', 'admin']:

            expected_url = endpoints[interface]['url']

            self.assertEqual(extra['%surl' % interface], expected_url)

        session.commit()

        session.close()

**** CubicPower OpenStack Study ****

    def insert_dict(self, session, table_name, d, table=None):

        """Naively inserts key-value pairs into a table, given a dictionary."""

        if table is None:

            this_table = sqlalchemy.Table(table_name, self.metadata,

                                          autoload=True)

        else:

            this_table = table

        insert = this_table.insert()

        insert.execute(d)

        session.commit()

**** CubicPower OpenStack Study ****

    def test_upgrade_31_to_32(self):

        self.upgrade(32)

        user_table = self.select_table("user")

        self.assertEqual(user_table.c.name.type.length, 255)

**** CubicPower OpenStack Study ****

    def test_downgrade_32_to_31(self):

        self.upgrade(32)

        session = self.Session()

        # NOTE(aloga): we need a different metadata object

        user_table = sqlalchemy.Table('user',

                                      sqlalchemy.MetaData(),

                                      autoload=True,

                                      autoload_with=self.engine)

        user_id = uuid.uuid4().hex

        ins = user_table.insert().values(

            {'id': user_id,

             'name': 'a' * 255,

             'password': uuid.uuid4().hex,

             'enabled': True,

             'domain_id': DEFAULT_DOMAIN_ID,

             'extra': '{}'})

        session.execute(ins)

        session.commit()

        self.downgrade(31)

        # Check that username has been truncated

        q = session.query(user_table.c.name)

        q = q.filter(user_table.c.id == user_id)

        r = q.one()

        user_name = r[0]

        self.assertEqual(len(user_name), 64)

        user_table = self.select_table("user")

        self.assertEqual(user_table.c.name.type.length, 64)

**** CubicPower OpenStack Study ****

    def test_downgrade_to_0(self):

        self.upgrade(self.max_version)

        if self.engine.name == 'mysql':

            self._mysql_check_all_tables_innodb()

        self.downgrade(0)

        for table_name in ["user", "token", "role", "user_tenant_membership",

                           "metadata"]:

            self.assertTableDoesNotExist(table_name)

**** CubicPower OpenStack Study ****

    def test_upgrade_add_domain_tables(self):

        self.upgrade(6)

        self.assertTableDoesNotExist('credential')

        self.assertTableDoesNotExist('domain')

        self.assertTableDoesNotExist('user_domain_metadata')

        self.upgrade(7)

        self.assertTableExists('credential')

        self.assertTableColumns('credential', ['id', 'user_id', 'project_id',

                                               'blob', 'type', 'extra'])

        self.assertTableExists('domain')

        self.assertTableColumns('domain', ['id', 'name', 'enabled', 'extra'])

        self.assertTableExists('user_domain_metadata')

        self.assertTableColumns('user_domain_metadata',

                                ['user_id', 'domain_id', 'data'])

**** CubicPower OpenStack Study ****

    def test_metadata_table_migration(self):

        # Scaffolding

        session = self.Session()

        self.upgrade(16)

        domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)

        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

        role_table = sqlalchemy.Table('role', self.metadata, autoload=True)

        project_table = sqlalchemy.Table(

            'project', self.metadata, autoload=True)

        metadata_table = sqlalchemy.Table(

            'metadata', self.metadata, autoload=True)

        # Create a Domain

        domain = {'id': uuid.uuid4().hex,

                  'name': uuid.uuid4().hex,

                  'enabled': True}

        session.execute(domain_table.insert().values(domain))

        # Create a Project

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': domain['id'],

                   'extra': "{}"}

        session.execute(project_table.insert().values(project))

        # Create another Project

        project2 = {'id': uuid.uuid4().hex,

                    'name': uuid.uuid4().hex,

                    'domain_id': domain['id'],

                    'extra': "{}"}

        session.execute(project_table.insert().values(project2))

        # Create a User

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': domain['id'],

                'password': uuid.uuid4().hex,

                'enabled': True,

                'extra': json.dumps({})}

        session.execute(user_table.insert().values(user))

        # Create a Role

        role = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex}

        session.execute(role_table.insert().values(role))

        # And another role

        role2 = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex}

        session.execute(role_table.insert().values(role2))

        # Grant Role to User

        role_grant = {'user_id': user['id'],

                      'tenant_id': project['id'],

                      'data': json.dumps({"roles": [role['id']]})}

        session.execute(metadata_table.insert().values(role_grant))

        role_grant = {'user_id': user['id'],

                      'tenant_id': project2['id'],

                      'data': json.dumps({"roles": [role2['id']]})}

        session.execute(metadata_table.insert().values(role_grant))

        # Create another user to test the case where member_role_id is already

        # assigned.

        user2 = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': domain['id'],

                 'password': uuid.uuid4().hex,

                 'enabled': True,

                 'extra': json.dumps({})}

        session.execute(user_table.insert().values(user2))

        # Grant CONF.member_role_id to User2

        role_grant = {'user_id': user2['id'],

                      'tenant_id': project['id'],

                      'data': json.dumps({"roles": [CONF.member_role_id]})}

        session.execute(metadata_table.insert().values(role_grant))

        session.commit()

        self.upgrade(17)

        user_project_metadata_table = sqlalchemy.Table(

            'user_project_metadata', self.metadata, autoload=True)

        s = sqlalchemy.select([metadata_table.c.data]).where(

            (metadata_table.c.user_id == user['id']) &

            (metadata_table.c.tenant_id == project['id']))

        r = session.execute(s)

        test_project1 = json.loads(r.fetchone()['data'])

        self.assertEqual(len(test_project1['roles']), 1)

        self.assertIn(role['id'], test_project1['roles'])

        # Test user in project2 has role2

        s = sqlalchemy.select([metadata_table.c.data]).where(

            (metadata_table.c.user_id == user['id']) &

            (metadata_table.c.tenant_id == project2['id']))

        r = session.execute(s)

        test_project2 = json.loads(r.fetchone()['data'])

        self.assertEqual(len(test_project2['roles']), 1)

        self.assertIn(role2['id'], test_project2['roles'])

        # Test for user in project has role in user_project_metadata

        # Migration 17 does not properly migrate this data, so this should

        # be None.

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project['id']))

        r = session.execute(s)

        self.assertIsNone(r.fetchone())

        # Create a conflicting user-project in user_project_metadata with

        # a different role

        data = json.dumps({"roles": [role2['id']]})

        role_grant = {'user_id': user['id'],

                      'project_id': project['id'],

                      'data': data}

        cmd = user_project_metadata_table.insert().values(role_grant)

        self.engine.execute(cmd)

        # Create another conflicting user-project for User2

        data = json.dumps({"roles": [role2['id']]})

        role_grant = {'user_id': user2['id'],

                      'project_id': project['id'],

                      'data': data}

        cmd = user_project_metadata_table.insert().values(role_grant)

        self.engine.execute(cmd)

        # End Scaffolding

        session.commit()

        # Migrate to 20

        self.upgrade(20)

        # The user-project pairs should have all roles from the previous

        # metadata table in addition to any roles currently in

        # user_project_metadata

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project['id']))

        r = session.execute(s)

        role_ids = json.loads(r.fetchone()['data'])['roles']

        self.assertEqual(len(role_ids), 3)

        self.assertIn(CONF.member_role_id, role_ids)

        self.assertIn(role['id'], role_ids)

        self.assertIn(role2['id'], role_ids)

        # pairs that only existed in old metadata table should be in

        # user_project_metadata

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project2['id']))

        r = session.execute(s)

        role_ids = json.loads(r.fetchone()['data'])['roles']

        self.assertEqual(len(role_ids), 2)

        self.assertIn(CONF.member_role_id, role_ids)

        self.assertIn(role2['id'], role_ids)

        self.assertTableDoesNotExist('metadata')

**** CubicPower OpenStack Study ****

    def test_upgrade_default_roles(self):

        def count_member_roles():

            session = self.Session()

            role_table = sqlalchemy.Table("role", self.metadata, autoload=True)

            return session.query(role_table).filter_by(

                name=config.CONF.member_role_name).count()

        self.upgrade(16)

        self.assertEqual(0, count_member_roles())

        self.upgrade(17)

        self.assertEqual(1, count_member_roles())

        self.downgrade(16)

        self.assertEqual(0, count_member_roles())

**** CubicPower OpenStack Study ****

        def count_member_roles():

            session = self.Session()

            role_table = sqlalchemy.Table("role", self.metadata, autoload=True)

            return session.query(role_table).filter_by(

                name=config.CONF.member_role_name).count()

        self.upgrade(16)

        self.assertEqual(0, count_member_roles())

        self.upgrade(17)

        self.assertEqual(1, count_member_roles())

        self.downgrade(16)

        self.assertEqual(0, count_member_roles())

**** CubicPower OpenStack Study ****

    def check_uniqueness_constraints(self):

        # Check uniqueness constraints for User & Project tables are

        # correct following schema modification.  The Group table's

        # schema is never modified, so we don't bother to check that.

        domain_table = sqlalchemy.Table('domain',

                                        self.metadata,

                                        autoload=True)

        domain1 = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'enabled': True}

        domain2 = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'enabled': True}

        cmd = domain_table.insert().values(domain1)

        self.engine.execute(cmd)

        cmd = domain_table.insert().values(domain2)

        self.engine.execute(cmd)

        # First, the User table.

        this_table = sqlalchemy.Table('user',

                                      self.metadata,

                                      autoload=True)

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': domain1['id'],

                'password': uuid.uuid4().hex,

                'enabled': True,

                'extra': json.dumps({})}

        cmd = this_table.insert().values(user)

        self.engine.execute(cmd)

        # now insert a user with the same name into a different

        # domain - which should work.

        user['id'] = uuid.uuid4().hex

        user['domain_id'] = domain2['id']

        cmd = this_table.insert().values(user)

        self.engine.execute(cmd)

        # TODO(henry-nash): For now, as part of clean-up we delete one of these

        # users.  Although not part of this test, unless we do so the

        # downgrade(16->15) that is part of teardown with fail due to having

        # two uses with clashing name as we try to revert to a single global

        # name space.  This limitation is raised as Bug #1125046 and the delete

        # could be removed depending on how that bug is resolved.

        cmd = this_table.delete().where(this_table.c.id == user['id'])

        self.engine.execute(cmd)

        # Now, the Project table.

        this_table = sqlalchemy.Table('project',

                                      self.metadata,

                                      autoload=True)

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': domain1['id'],

                   'description': uuid.uuid4().hex,

                   'enabled': True,

                   'extra': json.dumps({})}

        cmd = this_table.insert().values(project)

        self.engine.execute(cmd)

        # now insert a project with the same name into a different

        # domain - which should work.

        project['id'] = uuid.uuid4().hex

        project['domain_id'] = domain2['id']

        cmd = this_table.insert().values(project)

        self.engine.execute(cmd)

        # TODO(henry-nash): For now, we delete one of the projects for the same

        # reason as we delete one of the users (Bug #1125046). This delete

        # could be removed depending on that bug resolution.

        cmd = this_table.delete().where(this_table.c.id == project['id'])

        self.engine.execute(cmd)

**** CubicPower OpenStack Study ****

    def test_upgrade_trusts(self):

        self.assertEqual(self.schema.version, 0, "DB is at version 0")

        self.upgrade(20)

        self.assertTableColumns("token",

                                ["id", "expires", "extra", "valid"])

        self.upgrade(21)

        self.assertTableColumns("trust",

                                ["id", "trustor_user_id",

                                 "trustee_user_id",

                                 "project_id", "impersonation",

                                 "deleted_at",

                                 "expires_at", "extra"])

        self.assertTableColumns("trust_role",

                                ["trust_id", "role_id"])

        self.assertTableColumns("token",

                                ["id", "expires", "extra", "valid",

                                 "trust_id", "user_id"])

**** CubicPower OpenStack Study ****

    def test_fixup_role(self):

        def count_role():

            session = self.Session()

            self.initialize_sql()

            role_table = sqlalchemy.Table("role", self.metadata, autoload=True)

            return session.query(role_table).filter_by(extra=None).count()

        session = self.Session()

        self.assertEqual(self.schema.version, 0, "DB is at version 0")

        self.upgrade(1)

        self.insert_dict(session, "role", {"id": "test", "name": "test"})

        self.upgrade(18)

        self.insert_dict(session, "role", {"id": "test2",

                                           "name": "test2",

                                           "extra": None})

        self.assertEqual(count_role(), 2)

        self.upgrade(19)

        self.assertEqual(count_role(), 0)

**** CubicPower OpenStack Study ****

        def count_role():

            session = self.Session()

            self.initialize_sql()

            role_table = sqlalchemy.Table("role", self.metadata, autoload=True)

            return session.query(role_table).filter_by(extra=None).count()

        session = self.Session()

        self.assertEqual(self.schema.version, 0, "DB is at version 0")

        self.upgrade(1)

        self.insert_dict(session, "role", {"id": "test", "name": "test"})

        self.upgrade(18)

        self.insert_dict(session, "role", {"id": "test2",

                                           "name": "test2",

                                           "extra": None})

        self.assertEqual(count_role(), 2)

        self.upgrade(19)

        self.assertEqual(count_role(), 0)

**** CubicPower OpenStack Study ****

    def test_legacy_endpoint_id(self):

        session = self.Session()

        self.upgrade(21)

        service = {

            'id': uuid.uuid4().hex,

            'name': 'keystone',

            'type': 'identity'}

        self.insert_dict(session, 'service', service)

        legacy_endpoint_id = uuid.uuid4().hex

        endpoint = {

            'id': uuid.uuid4().hex,

            'service_id': service['id'],

            'interface': uuid.uuid4().hex[:8],

            'url': uuid.uuid4().hex,

            'extra': json.dumps({

                'legacy_endpoint_id': legacy_endpoint_id})}

        self.insert_dict(session, 'endpoint', endpoint)

        session.commit()

        self.upgrade(22)

        endpoint_table = sqlalchemy.Table(

            'endpoint', self.metadata, autoload=True)

        self.assertEqual(session.query(endpoint_table).count(), 1)

        ref = session.query(endpoint_table).one()

        self.assertEqual(ref.id, endpoint['id'], ref)

        self.assertEqual(ref.service_id, endpoint['service_id'])

        self.assertEqual(ref.interface, endpoint['interface'])

        self.assertEqual(ref.url, endpoint['url'])

        self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)

        self.assertEqual(ref.extra, '{}')

**** CubicPower OpenStack Study ****

    def test_group_project_FK_fixup(self):

        # To create test data we must start before we broke in the

        # group_project_metadata table in 015.

        self.upgrade(14)

        session = self.Session()

        domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)

        group_table = sqlalchemy.Table('group', self.metadata, autoload=True)

        tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)

        role_table = sqlalchemy.Table('role', self.metadata, autoload=True)

        group_project_metadata_table = sqlalchemy.Table(

            'group_project_metadata', self.metadata, autoload=True)

        # Create a Domain

        domain = {'id': uuid.uuid4().hex,

                  'name': uuid.uuid4().hex,

                  'enabled': True}

        session.execute(domain_table.insert().values(domain))

        # Create two Tenants

        tenant = {'id': uuid.uuid4().hex,

                  'name': uuid.uuid4().hex,

                  'extra': "{}"}

        session.execute(tenant_table.insert().values(tenant))

        tenant1 = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'extra': "{}"}

        session.execute(tenant_table.insert().values(tenant1))

        # Create a Group

        group = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': domain['id'],

                 'extra': json.dumps({})}

        session.execute(group_table.insert().values(group))

        # Create roles

        role_list = []

        for _ in range(2):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            session.execute(role_table.insert().values(role))

            role_list.append(role)

        # Grant Role to User on Project

        role_grant = {'group_id': group['id'],

                      'project_id': tenant['id'],

                      'data': json.dumps({'roles': [role_list[0]['id']]})}

        session.execute(

            group_project_metadata_table.insert().values(role_grant))

        role_grant = {'group_id': group['id'],

                      'project_id': tenant1['id'],

                      'data': json.dumps({'roles': [role_list[1]['id']]})}

        session.execute(

            group_project_metadata_table.insert().values(role_grant))

        session.commit()

        # Now upgrade and fix up the FKs

        self.upgrade(28)

        self.assertTableExists('group_project_metadata')

        self.assertTableExists('project')

        self.assertTableDoesNotExist('tenant')

        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(

            (group_project_metadata_table.c.group_id == group['id']) &

            (group_project_metadata_table.c.project_id == tenant['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[0]['id'], data['roles'])

        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(

            (group_project_metadata_table.c.group_id == group['id']) &

            (group_project_metadata_table.c.project_id == tenant1['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[1]['id'], data['roles'])

        self.downgrade(27)

        self.assertTableExists('group_project_metadata')

        self.assertTableExists('project')

        self.assertTableDoesNotExist('tenant')

**** CubicPower OpenStack Study ****

    def test_assignment_metadata_migration(self):

        self.upgrade(28)

        # Scaffolding

        session = self.Session()

        domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)

        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

        group_table = sqlalchemy.Table('group', self.metadata, autoload=True)

        role_table = sqlalchemy.Table('role', self.metadata, autoload=True)

        project_table = sqlalchemy.Table(

            'project', self.metadata, autoload=True)

        user_project_metadata_table = sqlalchemy.Table(

            'user_project_metadata', self.metadata, autoload=True)

        user_domain_metadata_table = sqlalchemy.Table(

            'user_domain_metadata', self.metadata, autoload=True)

        group_project_metadata_table = sqlalchemy.Table(

            'group_project_metadata', self.metadata, autoload=True)

        group_domain_metadata_table = sqlalchemy.Table(

            'group_domain_metadata', self.metadata, autoload=True)

        # Create a Domain

        domain = {'id': uuid.uuid4().hex,

                  'name': uuid.uuid4().hex,

                  'enabled': True}

        session.execute(domain_table.insert().values(domain))

        # Create anther Domain

        domain2 = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'enabled': True}

        session.execute(domain_table.insert().values(domain2))

        # Create a Project

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': domain['id'],

                   'extra': "{}"}

        session.execute(project_table.insert().values(project))

        # Create another Project

        project2 = {'id': uuid.uuid4().hex,

                    'name': uuid.uuid4().hex,

                    'domain_id': domain['id'],

                    'extra': "{}"}

        session.execute(project_table.insert().values(project2))

        # Create a User

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': domain['id'],

                'password': uuid.uuid4().hex,

                'enabled': True,

                'extra': json.dumps({})}

        session.execute(user_table.insert().values(user))

        # Create a Group

        group = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': domain['id'],

                 'extra': json.dumps({})}

        session.execute(group_table.insert().values(group))

        # Create roles

        role_list = []

        for _ in range(7):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            session.execute(role_table.insert().values(role))

            role_list.append(role)

        # Grant Role to User on Project

        role_grant = {'user_id': user['id'],

                      'project_id': project['id'],

                      'data': json.dumps({'roles': [role_list[0]['id']]})}

        session.execute(

            user_project_metadata_table.insert().values(role_grant))

        role_grant = {'user_id': user['id'],

                      'project_id': project2['id'],

                      'data': json.dumps({'roles': [role_list[1]['id']]})}

        session.execute(

            user_project_metadata_table.insert().values(role_grant))

        # Grant Role to Group on different Project

        role_grant = {'group_id': group['id'],

                      'project_id': project2['id'],

                      'data': json.dumps({'roles': [role_list[2]['id']]})}

        session.execute(

            group_project_metadata_table.insert().values(role_grant))

        # Grant Role to User on Domain

        role_grant = {'user_id': user['id'],

                      'domain_id': domain['id'],

                      'data': json.dumps({'roles': [role_list[3]['id']]})}

        session.execute(user_domain_metadata_table.insert().values(role_grant))

        # Grant Role to Group on Domain

        role_grant = {'group_id': group['id'],

                      'domain_id': domain['id'],

                      'data': json.dumps(

                          {'roles': [role_list[4]['id']],

                           'other': 'somedata'})}

        session.execute(

            group_domain_metadata_table.insert().values(role_grant))

        session.commit()

        self.upgrade(29)

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn({'id': role_list[0]['id']}, data['roles'])

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project2['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn({'id': role_list[1]['id']}, data['roles'])

        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(

            (group_project_metadata_table.c.group_id == group['id']) &

            (group_project_metadata_table.c.project_id == project2['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn({'id': role_list[2]['id']}, data['roles'])

        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(

            (user_domain_metadata_table.c.user_id == user['id']) &

            (user_domain_metadata_table.c.domain_id == domain['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn({'id': role_list[3]['id']}, data['roles'])

        s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(

            (group_domain_metadata_table.c.group_id == group['id']) &

            (group_domain_metadata_table.c.domain_id == domain['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn({'id': role_list[4]['id']}, data['roles'])

        self.assertIn('other', data)

        # Now add an entry that has one regular and one inherited role

        role_grant = {'user_id': user['id'],

                      'domain_id': domain2['id'],

                      'data': json.dumps(

                          {'roles': [{'id': role_list[5]['id']},

                                     {'id': role_list[6]['id'],

                                      'inherited_to': 'projects'}]})}

        session.execute(user_domain_metadata_table.insert().values(role_grant))

        session.commit()

        self.downgrade(28)

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[0]['id'], data['roles'])

        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(

            (user_project_metadata_table.c.user_id == user['id']) &

            (user_project_metadata_table.c.project_id == project2['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[1]['id'], data['roles'])

        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(

            (group_project_metadata_table.c.group_id == group['id']) &

            (group_project_metadata_table.c.project_id == project2['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[2]['id'], data['roles'])

        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(

            (user_domain_metadata_table.c.user_id == user['id']) &

            (user_domain_metadata_table.c.domain_id == domain['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[3]['id'], data['roles'])

        s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(

            (group_domain_metadata_table.c.group_id == group['id']) &

            (group_domain_metadata_table.c.domain_id == domain['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[4]['id'], data['roles'])

        self.assertIn('other', data)

        # For user-domain2, where we had one regular and one inherited role,

        # only the direct role should remain, the inherited role should

        # have been deleted during the downgrade

        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(

            (user_domain_metadata_table.c.user_id == user['id']) &

            (user_domain_metadata_table.c.domain_id == domain2['id']))

        r = session.execute(s)

        data = json.loads(r.fetchone()['data'])

        self.assertEqual(len(data['roles']), 1)

        self.assertIn(role_list[5]['id'], data['roles'])

**** CubicPower OpenStack Study ****

    def test_drop_credential_constraint(self):

        ec2_credential = {

            'id': '100',

            'user_id': 'foo',

            'project_id': 'bar',

            'type': 'ec2',

            'blob': json.dumps({

                "access": "12345",

                "secret": "12345"

            })

        }

        user = {

            'id': 'foo',

            'name': 'FOO',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'enabled': True})

        }

        tenant = {

            'id': 'bar',

            'name': 'BAR',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        session = self.Session()

        self.upgrade(7)

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'tenant', tenant)

        self.insert_dict(session, 'credential', ec2_credential)

        session.commit()

        self.upgrade(30)

        cred_table = sqlalchemy.Table('credential',

                                      self.metadata,

                                      autoload=True)

        cred = session.query(cred_table).filter("id='100'").one()

        self.assertEqual(cred.user_id,

                         ec2_credential['user_id'])

**** CubicPower OpenStack Study ****

    def test_drop_credential_indexes(self):

        self.upgrade(31)

        table = sqlalchemy.Table('credential', self.metadata, autoload=True)

        self.assertEqual(len(table.indexes), 0)

**** CubicPower OpenStack Study ****

    def test_downgrade_30(self):

        self.upgrade(31)

        self.downgrade(30)

        table = sqlalchemy.Table('credential', self.metadata, autoload=True)

        index_data = [(idx.name, idx.columns.keys())

                      for idx in table.indexes]

        if self.engine.name == 'mysql':

            self.assertIn(('user_id', ['user_id']), index_data)

            self.assertIn(('credential_project_id_fkey', ['project_id']),

                          index_data)

        else:

            self.assertEqual(len(index_data), 0)

**** CubicPower OpenStack Study ****

    def test_revoked_token_index(self):

        self.upgrade(35)

        table = sqlalchemy.Table('token', self.metadata, autoload=True)

        index_data = [(idx.name, idx.columns.keys())

                      for idx in table.indexes]

        self.assertIn(('ix_token_expires_valid', ['expires', 'valid']),

                      index_data)

**** CubicPower OpenStack Study ****

    def test_dropped_valid_index(self):

        self.upgrade(36)

        table = sqlalchemy.Table('token', self.metadata, autoload=True)

        index_data = [(idx.name, idx.columns.keys())

                      for idx in table.indexes]

        self.assertNotIn(('ix_token_valid', ['valid']), index_data)

**** CubicPower OpenStack Study ****

    def test_migrate_ec2_credential(self):

        user = {

            'id': 'foo',

            'name': 'FOO',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'enabled': True})

        }

        project = {

            'id': 'bar',

            'name': 'BAR',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        ec2_credential = {

            'access': uuid.uuid4().hex,

            'secret': uuid.uuid4().hex,

            'user_id': user['id'],

            'tenant_id': project['id'],

        }

        session = self.Session()

        self.upgrade(7)

        self.insert_dict(session, 'ec2_credential', ec2_credential)

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'tenant', project)

        self.upgrade(33)

        self.assertTableDoesNotExist('ec2_credential')

        cred_table = sqlalchemy.Table('credential',

                                      self.metadata,

                                      autoload=True)

        expected_credential_id = utils.hash_access_key(

            ec2_credential['access'])

        cred = session.query(cred_table).filter_by(

            id=expected_credential_id).one()

        self.assertEqual(cred.user_id, ec2_credential['user_id'])

        self.assertEqual(cred.project_id, ec2_credential['tenant_id'])

        # test list credential using credential manager.

        credential_api = credential.Manager()

        self.assertNotEmpty(credential_api.

                            list_credentials(

                                user_id=ec2_credential['user_id']))

        self.downgrade(32)

        session.commit()

        self.assertTableExists('ec2_credential')

        ec2_cred_table = sqlalchemy.Table('ec2_credential',

                                          self.metadata,

                                          autoload=True)

        ec2_cred = session.query(ec2_cred_table).filter_by(

            access=ec2_credential['access']).one()

        self.assertEqual(ec2_cred.user_id, ec2_credential['user_id'])

**** CubicPower OpenStack Study ****

    def test_migrate_ec2_credential_with_conflict_project(self):

        user = {

            'id': 'foo',

            'name': 'FOO',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'enabled': True})

        }

        project_1 = {

            'id': 'bar',

            'name': 'BAR',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        project_2 = {

            'id': 'baz',

            'name': 'BAZ',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        ec2_credential = {

            'access': uuid.uuid4().hex,

            'secret': uuid.uuid4().hex,

            'user_id': user['id'],

            'tenant_id': project_1['id'],

        }

        blob = {'access': ec2_credential['access'],

                'secret': ec2_credential['secret']}

        v3_credential = {

            'id': utils.hash_access_key(ec2_credential['access']),

            'user_id': user['id'],

            # set the project id to simulate a conflict

            'project_id': project_2['id'],

            'blob': json.dumps(blob),

            'type': 'ec2',

            'extra': json.dumps({})

        }

        session = self.Session()

        self.upgrade(7)

        self.insert_dict(session, 'ec2_credential', ec2_credential)

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'tenant', project_1)

        self.insert_dict(session, 'tenant', project_2)

        self.upgrade(32)

        self.insert_dict(session, 'credential', v3_credential)

        self.assertRaises(exception.Conflict, self.upgrade, 33)

**** CubicPower OpenStack Study ****

    def test_migrate_ec2_credential_with_conflict_secret(self):

        user = {

            'id': 'foo',

            'name': 'FOO',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'enabled': True})

        }

        project_1 = {

            'id': 'bar',

            'name': 'BAR',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        project_2 = {

            'id': 'baz',

            'name': 'BAZ',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        ec2_credential = {

            'access': uuid.uuid4().hex,

            'secret': uuid.uuid4().hex,

            'user_id': user['id'],

            'tenant_id': project_1['id'],

        }

        blob = {'access': ec2_credential['access'],

                'secret': 'different secret'}

        v3_cred_different_secret = {

            'id': utils.hash_access_key(ec2_credential['access']),

            'user_id': user['id'],

            'project_id': project_1['id'],

            'blob': json.dumps(blob),

            'type': 'ec2',

            'extra': json.dumps({})

        }

        session = self.Session()

        self.upgrade(7)

        self.insert_dict(session, 'ec2_credential', ec2_credential)

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'tenant', project_1)

        self.insert_dict(session, 'tenant', project_2)

        self.upgrade(32)

        self.insert_dict(session, 'credential', v3_cred_different_secret)

        self.assertRaises(exception.Conflict, self.upgrade, 33)

**** CubicPower OpenStack Study ****

    def test_migrate_ec2_credential_with_invalid_blob(self):

        user = {

            'id': 'foo',

            'name': 'FOO',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'enabled': True})

        }

        project_1 = {

            'id': 'bar',

            'name': 'BAR',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        project_2 = {

            'id': 'baz',

            'name': 'BAZ',

            'description': 'description',

            'enabled': True,

            'extra': json.dumps({'enabled': True})

        }

        ec2_credential = {

            'access': uuid.uuid4().hex,

            'secret': uuid.uuid4().hex,

            'user_id': user['id'],

            'tenant_id': project_1['id'],

        }

        blob = '{"abc":"def"d}'

        v3_cred_invalid_blob = {

            'id': utils.hash_access_key(ec2_credential['access']),

            'user_id': user['id'],

            'project_id': project_1['id'],

            'blob': json.dumps(blob),

            'type': 'ec2',

            'extra': json.dumps({})

        }

        session = self.Session()

        self.upgrade(7)

        self.insert_dict(session, 'ec2_credential', ec2_credential)

        self.insert_dict(session, 'user', user)

        self.insert_dict(session, 'tenant', project_1)

        self.insert_dict(session, 'tenant', project_2)

        self.upgrade(32)

        self.insert_dict(session, 'credential', v3_cred_invalid_blob)

        self.assertRaises(exception.ValidationError, self.upgrade, 33)

**** CubicPower OpenStack Study ****

    def test_migrate_add_default_project_id_column_upgrade(self):

        user1 = {

            'id': 'foo1',

            'name': 'FOO1',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'tenantId': 'bar'}),

            'domain_id': DEFAULT_DOMAIN_ID

        }

        user2 = {

            'id': 'foo2',

            'name': 'FOO2',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'tenant_id': 'bar'}),

            'domain_id': DEFAULT_DOMAIN_ID

        }

        user3 = {

            'id': 'foo3',

            'name': 'FOO3',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'default_project_id': 'bar'}),

            'domain_id': DEFAULT_DOMAIN_ID

        }

        user4 = {

            'id': 'foo4',

            'name': 'FOO4',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({'tenantId': 'baz',

                                 'default_project_id': 'bar'}),

            'domain_id': DEFAULT_DOMAIN_ID

        }

        session = self.Session()

        self.upgrade(33)

        self.insert_dict(session, 'user', user1)

        self.insert_dict(session, 'user', user2)

        self.insert_dict(session, 'user', user3)

        self.insert_dict(session, 'user', user4)

        self.assertTableColumns('user',

                                ['id', 'name', 'extra', 'password',

                                 'enabled', 'domain_id'])

        session.commit()

        session.close()

        self.upgrade(34)

        session = self.Session()

        self.assertTableColumns('user',

                                ['id', 'name', 'extra', 'password',

                                 'enabled', 'domain_id', 'default_project_id'])

        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

        updated_user1 = session.query(user_table).filter_by(id='foo1').one()

        old_json_data = json.loads(user1['extra'])

        new_json_data = json.loads(updated_user1.extra)

        self.assertNotIn('tenantId', new_json_data)

        self.assertEqual(old_json_data['tenantId'],

                         updated_user1.default_project_id)

        updated_user2 = session.query(user_table).filter_by(id='foo2').one()

        old_json_data = json.loads(user2['extra'])

        new_json_data = json.loads(updated_user2.extra)

        self.assertNotIn('tenant_id', new_json_data)

        self.assertEqual(old_json_data['tenant_id'],

                         updated_user2.default_project_id)

        updated_user3 = session.query(user_table).filter_by(id='foo3').one()

        old_json_data = json.loads(user3['extra'])

        new_json_data = json.loads(updated_user3.extra)

        self.assertNotIn('default_project_id', new_json_data)

        self.assertEqual(old_json_data['default_project_id'],

                         updated_user3.default_project_id)

        updated_user4 = session.query(user_table).filter_by(id='foo4').one()

        old_json_data = json.loads(user4['extra'])

        new_json_data = json.loads(updated_user4.extra)

        self.assertNotIn('default_project_id', new_json_data)

        self.assertNotIn('tenantId', new_json_data)

        self.assertEqual(old_json_data['default_project_id'],

                         updated_user4.default_project_id)

**** CubicPower OpenStack Study ****

    def test_migrate_add_default_project_id_column_downgrade(self):

        user1 = {

            'id': 'foo1',

            'name': 'FOO1',

            'password': 'foo2',

            'enabled': True,

            'email': 'foo@bar.com',

            'extra': json.dumps({}),

            'default_project_id': 'bar',

            'domain_id': DEFAULT_DOMAIN_ID

        }

        self.upgrade(34)

        session = self.Session()

        self.insert_dict(session, 'user', user1)

        self.assertTableColumns('user',

                                ['id', 'name', 'extra', 'password',

                                 'enabled', 'domain_id', 'default_project_id'])

        session.commit()

        session.close()

        self.downgrade(33)

        session = self.Session()

        self.assertTableColumns('user',

                                ['id', 'name', 'extra', 'password',

                                 'enabled', 'domain_id'])

        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

        updated_user1 = session.query(user_table).filter_by(id='foo1').one()

        new_json_data = json.loads(updated_user1.extra)

        self.assertIn('tenantId', new_json_data)

        self.assertIn('default_project_id', new_json_data)

        self.assertEqual(user1['default_project_id'],

                         new_json_data['tenantId'])

        self.assertEqual(user1['default_project_id'],

                         new_json_data['default_project_id'])

        self.assertEqual(user1['default_project_id'],

                         new_json_data['tenant_id'])

**** CubicPower OpenStack Study ****

    def test_region_migration(self):

        self.upgrade(36)

        self.assertTableDoesNotExist('region')

        self.upgrade(37)

        self.assertTableExists('region')

        self.downgrade(36)

        self.assertTableDoesNotExist('region')

**** CubicPower OpenStack Study ****

    def test_assignment_table_migration(self):

        def create_base_data(session):

            domain_table = sqlalchemy.Table('domain', self.metadata,

                                            autoload=True)

            user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

            group_table = sqlalchemy.Table('group', self.metadata,

                                           autoload=True)

            role_table = sqlalchemy.Table('role', self.metadata, autoload=True)

            project_table = sqlalchemy.Table(

                'project', self.metadata, autoload=True)

            base_data = {}

            # Create a Domain

            base_data['domain'] = {'id': uuid.uuid4().hex,

                                   'name': uuid.uuid4().hex,

                                   'enabled': True}

            session.execute(domain_table.insert().values(base_data['domain']))

            # Create another Domain

            base_data['domain2'] = {'id': uuid.uuid4().hex,

                                    'name': uuid.uuid4().hex,

                                    'enabled': True}

            session.execute(domain_table.insert().values(base_data['domain2']))

            # Create a Project

            base_data['project'] = {'id': uuid.uuid4().hex,

                                    'name': uuid.uuid4().hex,

                                    'domain_id': base_data['domain']['id'],

                                    'extra': "{}"}

            session.execute(

                project_table.insert().values(base_data['project']))

            # Create another Project

            base_data['project2'] = {'id': uuid.uuid4().hex,

                                     'name': uuid.uuid4().hex,

                                     'domain_id': base_data['domain']['id'],

                                     'extra': "{}"}

            session.execute(

                project_table.insert().values(base_data['project2']))

            # Create a User

            base_data['user'] = {'id': uuid.uuid4().hex,

                                 'name': uuid.uuid4().hex,

                                 'domain_id': base_data['domain']['id'],

                                 'password': uuid.uuid4().hex,

                                 'enabled': True,

                                 'extra': "{}"}

            session.execute(user_table.insert().values(base_data['user']))

            # Create a Group

            base_data['group'] = {'id': uuid.uuid4().hex,

                                  'name': uuid.uuid4().hex,

                                  'domain_id': base_data['domain']['id'],

                                  'extra': "{}"}

            session.execute(group_table.insert().values(base_data['group']))

            # Create roles

            base_data['roles'] = []

            for _ in range(9):

                role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

                session.execute(role_table.insert().values(role))

                base_data['roles'].append(role)

            return base_data

        def populate_grants(session, base_data):

            user_project_table = sqlalchemy.Table(

                'user_project_metadata', self.metadata, autoload=True)

            user_domain_table = sqlalchemy.Table(

                'user_domain_metadata', self.metadata, autoload=True)

            group_project_table = sqlalchemy.Table(

                'group_project_metadata', self.metadata, autoload=True)

            group_domain_table = sqlalchemy.Table(

                'group_domain_metadata', self.metadata, autoload=True)

            # Grant a role to user on project

            grant = {'user_id': base_data['user']['id'],

                     'project_id': base_data['project']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][0]['id']}]})}

            session.execute(user_project_table.insert().values(grant))

            # Grant two roles to user on project2

            grant = {'user_id': base_data['user']['id'],

                     'project_id': base_data['project2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][1]['id']},

                                    {'id': base_data['roles'][2]['id']}]})}

            session.execute(user_project_table.insert().values(grant))

            # Grant role to group on project

            grant = {'group_id': base_data['group']['id'],

                     'project_id': base_data['project']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][3]['id']}]})}

            session.execute(group_project_table.insert().values(grant))

            # Grant two roles to group on project2

            grant = {'group_id': base_data['group']['id'],

                     'project_id': base_data['project2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][4]['id']},

                                    {'id': base_data['roles'][5]['id']}]})}

            session.execute(group_project_table.insert().values(grant))

            # Grant two roles to group on domain, one inherited, one not

            grant = {'group_id': base_data['group']['id'],

                     'domain_id': base_data['domain']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][6]['id']},

                                    {'id': base_data['roles'][7]['id'],

                                     'inherited_to': 'projects'}]})}

            session.execute(group_domain_table.insert().values(grant))

            # Grant inherited role to user on domain

            grant = {'user_id': base_data['user']['id'],

                     'domain_id': base_data['domain']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][8]['id'],

                                     'inherited_to': 'projects'}]})}

            session.execute(user_domain_table.insert().values(grant))

            # Grant two non-inherited roles to user on domain2, using roles

            # that are also assigned to other actors/targets

            grant = {'user_id': base_data['user']['id'],

                     'domain_id': base_data['domain2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][6]['id']},

                                    {'id': base_data['roles'][7]['id']}]})}

            session.execute(user_domain_table.insert().values(grant))

            session.commit()

        def check_grants(session, base_data):

            user_project_table = sqlalchemy.Table(

                'user_project_metadata', self.metadata, autoload=True)

            user_domain_table = sqlalchemy.Table(

                'user_domain_metadata', self.metadata, autoload=True)

            group_project_table = sqlalchemy.Table(

                'group_project_metadata', self.metadata, autoload=True)

            group_domain_table = sqlalchemy.Table(

                'group_domain_metadata', self.metadata, autoload=True)

            s = sqlalchemy.select([user_project_table.c.data]).where(

                (user_project_table.c.user_id == base_data['user']['id']) &

                (user_project_table.c.project_id ==

                 base_data['project']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][0]['id']}, data['roles'])

            s = sqlalchemy.select([user_project_table.c.data]).where(

                (user_project_table.c.user_id == base_data['user']['id']) &

                (user_project_table.c.project_id ==

                 base_data['project2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][1]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][2]['id']}, data['roles'])

            s = sqlalchemy.select([group_project_table.c.data]).where(

                (group_project_table.c.group_id == base_data['group']['id']) &

                (group_project_table.c.project_id ==

                 base_data['project']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][3]['id']}, data['roles'])

            s = sqlalchemy.select([group_project_table.c.data]).where(

                (group_project_table.c.group_id == base_data['group']['id']) &

                (group_project_table.c.project_id ==

                 base_data['project2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][4]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][5]['id']}, data['roles'])

            s = sqlalchemy.select([group_domain_table.c.data]).where(

                (group_domain_table.c.group_id == base_data['group']['id']) &

                (group_domain_table.c.domain_id == base_data['domain']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][6]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][7]['id'],

                           'inherited_to': 'projects'}, data['roles'])

            s = sqlalchemy.select([user_domain_table.c.data]).where(

                (user_domain_table.c.user_id == base_data['user']['id']) &

                (user_domain_table.c.domain_id == base_data['domain']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][8]['id'],

                           'inherited_to': 'projects'}, data['roles'])

            s = sqlalchemy.select([user_domain_table.c.data]).where(

                (user_domain_table.c.user_id == base_data['user']['id']) &

                (user_domain_table.c.domain_id == base_data['domain2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][6]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][7]['id']}, data['roles'])

        def check_assignments(session, base_data):

            def check_assignment_type(refs, type):

                for ref in refs:

                    self.assertEqual(ref.type, type)

            assignment_table = sqlalchemy.Table(

                'assignment', self.metadata, autoload=True)

            refs = session.query(assignment_table).all()

            self.assertEqual(len(refs), 11)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['project']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][0]['id'])

            self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['project2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][1]['id'],

                        base_data['roles'][2]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['project']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][3]['id'])

            self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['project2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][4]['id'],

                        base_data['roles'][5]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['domain']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][6]['id'],

                        base_data['roles'][7]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            if refs[0].role_id == base_data['roles'][7]['id']:

                self.assertTrue(refs[0].inherited)

                self.assertFalse(refs[1].inherited)

            else:

                self.assertTrue(refs[1].inherited)

                self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_DOMAIN)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['domain']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][8]['id'])

            self.assertTrue(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_DOMAIN)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['domain2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][6]['id'],

                        base_data['roles'][7]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_DOMAIN)

        self.upgrade(37)

        session = self.Session()

        self.assertTableDoesNotExist('assignment')

        base_data = create_base_data(session)

        populate_grants(session, base_data)

        check_grants(session, base_data)

        session.commit()

        session.close()

        self.upgrade(40)

        session = self.Session()

        self.assertTableExists('assignment')

        self.assertTableDoesNotExist('user_project_metadata')

        self.assertTableDoesNotExist('group_project_metadata')

        self.assertTableDoesNotExist('user_domain_metadata')

        self.assertTableDoesNotExist('group_domain_metadata')

        check_assignments(session, base_data)

        session.close()

        self.downgrade(37)

        session = self.Session()

        self.assertTableDoesNotExist('assignment')

        check_grants(session, base_data)

        session.close()

**** CubicPower OpenStack Study ****

        def create_base_data(session):

            domain_table = sqlalchemy.Table('domain', self.metadata,

                                            autoload=True)

            user_table = sqlalchemy.Table('user', self.metadata, autoload=True)

            group_table = sqlalchemy.Table('group', self.metadata,

                                           autoload=True)

            role_table = sqlalchemy.Table('role', self.metadata, autoload=True)

            project_table = sqlalchemy.Table(

                'project', self.metadata, autoload=True)

            base_data = {}

            # Create a Domain

            base_data['domain'] = {'id': uuid.uuid4().hex,

                                   'name': uuid.uuid4().hex,

                                   'enabled': True}

            session.execute(domain_table.insert().values(base_data['domain']))

            # Create another Domain

            base_data['domain2'] = {'id': uuid.uuid4().hex,

                                    'name': uuid.uuid4().hex,

                                    'enabled': True}

            session.execute(domain_table.insert().values(base_data['domain2']))

            # Create a Project

            base_data['project'] = {'id': uuid.uuid4().hex,

                                    'name': uuid.uuid4().hex,

                                    'domain_id': base_data['domain']['id'],

                                    'extra': "{}"}

            session.execute(

                project_table.insert().values(base_data['project']))

            # Create another Project

            base_data['project2'] = {'id': uuid.uuid4().hex,

                                     'name': uuid.uuid4().hex,

                                     'domain_id': base_data['domain']['id'],

                                     'extra': "{}"}

            session.execute(

                project_table.insert().values(base_data['project2']))

            # Create a User

            base_data['user'] = {'id': uuid.uuid4().hex,

                                 'name': uuid.uuid4().hex,

                                 'domain_id': base_data['domain']['id'],

                                 'password': uuid.uuid4().hex,

                                 'enabled': True,

                                 'extra': "{}"}

            session.execute(user_table.insert().values(base_data['user']))

            # Create a Group

            base_data['group'] = {'id': uuid.uuid4().hex,

                                  'name': uuid.uuid4().hex,

                                  'domain_id': base_data['domain']['id'],

                                  'extra': "{}"}

            session.execute(group_table.insert().values(base_data['group']))

            # Create roles

            base_data['roles'] = []

            for _ in range(9):

                role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

                session.execute(role_table.insert().values(role))

                base_data['roles'].append(role)

            return base_data

**** CubicPower OpenStack Study ****

        def populate_grants(session, base_data):

            user_project_table = sqlalchemy.Table(

                'user_project_metadata', self.metadata, autoload=True)

            user_domain_table = sqlalchemy.Table(

                'user_domain_metadata', self.metadata, autoload=True)

            group_project_table = sqlalchemy.Table(

                'group_project_metadata', self.metadata, autoload=True)

            group_domain_table = sqlalchemy.Table(

                'group_domain_metadata', self.metadata, autoload=True)

            # Grant a role to user on project

            grant = {'user_id': base_data['user']['id'],

                     'project_id': base_data['project']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][0]['id']}]})}

            session.execute(user_project_table.insert().values(grant))

            # Grant two roles to user on project2

            grant = {'user_id': base_data['user']['id'],

                     'project_id': base_data['project2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][1]['id']},

                                    {'id': base_data['roles'][2]['id']}]})}

            session.execute(user_project_table.insert().values(grant))

            # Grant role to group on project

            grant = {'group_id': base_data['group']['id'],

                     'project_id': base_data['project']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][3]['id']}]})}

            session.execute(group_project_table.insert().values(grant))

            # Grant two roles to group on project2

            grant = {'group_id': base_data['group']['id'],

                     'project_id': base_data['project2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][4]['id']},

                                    {'id': base_data['roles'][5]['id']}]})}

            session.execute(group_project_table.insert().values(grant))

            # Grant two roles to group on domain, one inherited, one not

            grant = {'group_id': base_data['group']['id'],

                     'domain_id': base_data['domain']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][6]['id']},

                                    {'id': base_data['roles'][7]['id'],

                                     'inherited_to': 'projects'}]})}

            session.execute(group_domain_table.insert().values(grant))

            # Grant inherited role to user on domain

            grant = {'user_id': base_data['user']['id'],

                     'domain_id': base_data['domain']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][8]['id'],

                                     'inherited_to': 'projects'}]})}

            session.execute(user_domain_table.insert().values(grant))

            # Grant two non-inherited roles to user on domain2, using roles

            # that are also assigned to other actors/targets

            grant = {'user_id': base_data['user']['id'],

                     'domain_id': base_data['domain2']['id'],

                     'data': json.dumps(

                         {'roles': [{'id': base_data['roles'][6]['id']},

                                    {'id': base_data['roles'][7]['id']}]})}

            session.execute(user_domain_table.insert().values(grant))

            session.commit()

**** CubicPower OpenStack Study ****

        def check_grants(session, base_data):

            user_project_table = sqlalchemy.Table(

                'user_project_metadata', self.metadata, autoload=True)

            user_domain_table = sqlalchemy.Table(

                'user_domain_metadata', self.metadata, autoload=True)

            group_project_table = sqlalchemy.Table(

                'group_project_metadata', self.metadata, autoload=True)

            group_domain_table = sqlalchemy.Table(

                'group_domain_metadata', self.metadata, autoload=True)

            s = sqlalchemy.select([user_project_table.c.data]).where(

                (user_project_table.c.user_id == base_data['user']['id']) &

                (user_project_table.c.project_id ==

                 base_data['project']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][0]['id']}, data['roles'])

            s = sqlalchemy.select([user_project_table.c.data]).where(

                (user_project_table.c.user_id == base_data['user']['id']) &

                (user_project_table.c.project_id ==

                 base_data['project2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][1]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][2]['id']}, data['roles'])

            s = sqlalchemy.select([group_project_table.c.data]).where(

                (group_project_table.c.group_id == base_data['group']['id']) &

                (group_project_table.c.project_id ==

                 base_data['project']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][3]['id']}, data['roles'])

            s = sqlalchemy.select([group_project_table.c.data]).where(

                (group_project_table.c.group_id == base_data['group']['id']) &

                (group_project_table.c.project_id ==

                 base_data['project2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][4]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][5]['id']}, data['roles'])

            s = sqlalchemy.select([group_domain_table.c.data]).where(

                (group_domain_table.c.group_id == base_data['group']['id']) &

                (group_domain_table.c.domain_id == base_data['domain']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][6]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][7]['id'],

                           'inherited_to': 'projects'}, data['roles'])

            s = sqlalchemy.select([user_domain_table.c.data]).where(

                (user_domain_table.c.user_id == base_data['user']['id']) &

                (user_domain_table.c.domain_id == base_data['domain']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 1)

            self.assertIn({'id': base_data['roles'][8]['id'],

                           'inherited_to': 'projects'}, data['roles'])

            s = sqlalchemy.select([user_domain_table.c.data]).where(

                (user_domain_table.c.user_id == base_data['user']['id']) &

                (user_domain_table.c.domain_id == base_data['domain2']['id']))

            r = session.execute(s)

            data = json.loads(r.fetchone()['data'])

            self.assertEqual(len(data['roles']), 2)

            self.assertIn({'id': base_data['roles'][6]['id']}, data['roles'])

            self.assertIn({'id': base_data['roles'][7]['id']}, data['roles'])

**** CubicPower OpenStack Study ****

        def check_assignments(session, base_data):

            def check_assignment_type(refs, type):

                for ref in refs:

                    self.assertEqual(ref.type, type)

            assignment_table = sqlalchemy.Table(

                'assignment', self.metadata, autoload=True)

            refs = session.query(assignment_table).all()

            self.assertEqual(len(refs), 11)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['project']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][0]['id'])

            self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['project2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][1]['id'],

                        base_data['roles'][2]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['project']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][3]['id'])

            self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['project2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][4]['id'],

                        base_data['roles'][5]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_PROJECT)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['group']['id'])

            q = q.filter_by(target_id=base_data['domain']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][6]['id'],

                        base_data['roles'][7]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            if refs[0].role_id == base_data['roles'][7]['id']:

                self.assertTrue(refs[0].inherited)

                self.assertFalse(refs[1].inherited)

            else:

                self.assertTrue(refs[1].inherited)

                self.assertFalse(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.GROUP_DOMAIN)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['domain']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 1)

            self.assertEqual(refs[0].role_id, base_data['roles'][8]['id'])

            self.assertTrue(refs[0].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_DOMAIN)

            q = session.query(assignment_table)

            q = q.filter_by(actor_id=base_data['user']['id'])

            q = q.filter_by(target_id=base_data['domain2']['id'])

            refs = q.all()

            self.assertEqual(len(refs), 2)

            role_ids = [base_data['roles'][6]['id'],

                        base_data['roles'][7]['id']]

            self.assertIn(refs[0].role_id, role_ids)

            self.assertIn(refs[1].role_id, role_ids)

            self.assertFalse(refs[0].inherited)

            self.assertFalse(refs[1].inherited)

            check_assignment_type(refs,

                                  assignment_sql.AssignmentType.USER_DOMAIN)

        self.upgrade(37)

        session = self.Session()

        self.assertTableDoesNotExist('assignment')

        base_data = create_base_data(session)

        populate_grants(session, base_data)

        check_grants(session, base_data)

        session.commit()

        session.close()

        self.upgrade(40)

        session = self.Session()

        self.assertTableExists('assignment')

        self.assertTableDoesNotExist('user_project_metadata')

        self.assertTableDoesNotExist('group_project_metadata')

        self.assertTableDoesNotExist('user_domain_metadata')

        self.assertTableDoesNotExist('group_domain_metadata')

        check_assignments(session, base_data)

        session.close()

        self.downgrade(37)

        session = self.Session()

        self.assertTableDoesNotExist('assignment')

        check_grants(session, base_data)

        session.close()

**** CubicPower OpenStack Study ****

    def test_limited_trusts_upgrade(self):

        # make sure that the remaining_uses column is created

        self.upgrade(41)

        self.assertTableColumns('trust',

                                ['id', 'trustor_user_id',

                                 'trustee_user_id',

                                 'project_id', 'impersonation',

                                 'deleted_at',

                                 'expires_at', 'extra',

                                 'remaining_uses'])

**** CubicPower OpenStack Study ****

    def test_limited_trusts_downgrade(self):

        # make sure that the remaining_uses column is removed

        self.upgrade(41)

        self.downgrade(40)

        self.assertTableColumns('trust',

                                ['id', 'trustor_user_id',

                                 'trustee_user_id',

                                 'project_id', 'impersonation',

                                 'deleted_at',

                                 'expires_at', 'extra'])

**** CubicPower OpenStack Study ****

    def test_limited_trusts_downgrade_trusts_cleanup(self):

        # make sure that only trusts with unlimited uses are kept in the

        # downgrade

        self.upgrade(41)

        session = self.Session()

        trust_table = sqlalchemy.Table(

            'trust', self.metadata, autoload=True)

        limited_trust = {

            'id': uuid.uuid4().hex,

            'trustor_user_id': uuid.uuid4().hex,

            'trustee_user_id': uuid.uuid4().hex,

            'project_id': uuid.uuid4().hex,

            'impersonation': True,

            'remaining_uses': 5

        }

        consumed_trust = {

            'id': uuid.uuid4().hex,

            'trustor_user_id': uuid.uuid4().hex,

            'trustee_user_id': uuid.uuid4().hex,

            'project_id': uuid.uuid4().hex,

            'impersonation': True,

            'remaining_uses': 0

        }

        unlimited_trust = {

            'id': uuid.uuid4().hex,

            'trustor_user_id': uuid.uuid4().hex,

            'trustee_user_id': uuid.uuid4().hex,

            'project_id': uuid.uuid4().hex,

            'impersonation': True,

            'remaining_uses': None

        }

        self.insert_dict(session, 'trust', limited_trust)

        self.insert_dict(session, 'trust', consumed_trust)

        self.insert_dict(session, 'trust', unlimited_trust)

        trust_table = sqlalchemy.Table(

            'trust', self.metadata, autoload=True)

        # we should have 3 trusts in base

        self.assertEqual(3, session.query(trust_table).count())

        self.downgrade(40)

        session = self.Session()

        trust_table = sqlalchemy.Table(

            'trust', self.metadata, autoload=True)

        # Now only one trust remains ...

        self.assertEqual(1, session.query(trust_table.columns.id).count())

        # ... and this trust is the one that was not limited in uses

        self.assertEqual(

            unlimited_trust['id'],

            session.query(trust_table.columns.id).one()[0])

**** CubicPower OpenStack Study ****

    def test_upgrade_service_enabled_cols(self):

        """Migration 44 added `enabled` column to `service` table."""

        self.upgrade(44)

        # Verify that there's an 'enabled' field.

        exp_cols = ['id', 'type', 'extra', 'enabled']

        self.assertTableColumns('service', exp_cols)

**** CubicPower OpenStack Study ****

    def test_downgrade_service_enabled_cols(self):

        """Check columns when downgrade to migration 43.

        The downgrade from migration 44 removes the `enabled` column from the

        `service` table.

        """

        self.upgrade(44)

        self.downgrade(43)

        exp_cols = ['id', 'type', 'extra']

        self.assertTableColumns('service', exp_cols)

**** CubicPower OpenStack Study ****

    def test_upgrade_service_enabled_data(self):

        """Migration 44 has to migrate data from `extra` to `enabled`."""

        session = self.Session()

        def add_service(**extra_data):

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

            }

            self.insert_dict(session, 'service', service)

            return service_id

        self.upgrade(43)

        # Different services with expected enabled and extra values, and a

        # description.

        random_attr_name = uuid.uuid4().hex

        random_attr_value = uuid.uuid4().hex

        random_attr = {random_attr_name: random_attr_value}

        random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value)

        random_attr_enabled_false = {random_attr_name: random_attr_value,

                                     'enabled': False}

        random_attr_enabled_false_str = 'enabled=False,%s' % random_attr_str

        services = [

            # Some values for True.

            (add_service(), (True, {}), 'no enabled'),

            (add_service(enabled=True), (True, {}), 'enabled=True'),

            (add_service(enabled='true'), (True, {}), "enabled='true'"),

            (add_service(**random_attr),

             (True, random_attr), random_attr_str),

            (add_service(enabled=None), (True, {}), 'enabled=None'),

            # Some values for False.

            (add_service(enabled=False), (False, {}), 'enabled=False'),

            (add_service(enabled='false'), (False, {}), "enabled='false'"),

            (add_service(enabled='0'), (False, {}), "enabled='0'"),

            (add_service(**random_attr_enabled_false),

             (False, random_attr), random_attr_enabled_false_str),

        ]

        self.upgrade(44)

        # Verify that the services have the expected values.

        self.metadata.clear()

        service_table = sqlalchemy.Table('service', self.metadata,

                                         autoload=True)

        def fetch_service(service_id):

            cols = [service_table.c.enabled, service_table.c.extra]

            f = service_table.c.id == service_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return ep.enabled, json.loads(ep.extra)

        for service_id, exp, msg in services:

            exp_enabled, exp_extra = exp

            enabled, extra = fetch_service(service_id)

            self.assertIs(exp_enabled, enabled, msg)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

        def add_service(**extra_data):

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

            }

            self.insert_dict(session, 'service', service)

            return service_id

        self.upgrade(43)

        # Different services with expected enabled and extra values, and a

        # description.

        random_attr_name = uuid.uuid4().hex

        random_attr_value = uuid.uuid4().hex

        random_attr = {random_attr_name: random_attr_value}

        random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value)

        random_attr_enabled_false = {random_attr_name: random_attr_value,

                                     'enabled': False}

        random_attr_enabled_false_str = 'enabled=False,%s' % random_attr_str

        services = [

            # Some values for True.

            (add_service(), (True, {}), 'no enabled'),

            (add_service(enabled=True), (True, {}), 'enabled=True'),

            (add_service(enabled='true'), (True, {}), "enabled='true'"),

            (add_service(**random_attr),

             (True, random_attr), random_attr_str),

            (add_service(enabled=None), (True, {}), 'enabled=None'),

            # Some values for False.

            (add_service(enabled=False), (False, {}), 'enabled=False'),

            (add_service(enabled='false'), (False, {}), "enabled='false'"),

            (add_service(enabled='0'), (False, {}), "enabled='0'"),

            (add_service(**random_attr_enabled_false),

             (False, random_attr), random_attr_enabled_false_str),

        ]

        self.upgrade(44)

        # Verify that the services have the expected values.

        self.metadata.clear()

        service_table = sqlalchemy.Table('service', self.metadata,

                                         autoload=True)

**** CubicPower OpenStack Study ****

        def fetch_service(service_id):

            cols = [service_table.c.enabled, service_table.c.extra]

            f = service_table.c.id == service_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return ep.enabled, json.loads(ep.extra)

        for service_id, exp, msg in services:

            exp_enabled, exp_extra = exp

            enabled, extra = fetch_service(service_id)

            self.assertIs(exp_enabled, enabled, msg)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

    def test_downgrade_service_enabled_data(self):

        """Downgrade from migration 44 migrates data.

        Downgrade from migration 44 migrates data from `enabled` to

        `extra`. Any disabled services have 'enabled': False put into 'extra'.

        """

        session = self.Session()

        def add_service(enabled=True, **extra_data):

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

                'enabled': enabled

            }

            self.insert_dict(session, 'service', service)

            return service_id

        self.upgrade(44)

        # Insert some services using the new format.

        # We'll need a service entry since it's the foreign key for services.

        service_id = add_service(True)

        new_service = (lambda enabled, **extra_data:

                       add_service(enabled, **extra_data))

        # Different services with expected extra values, and a

        # description.

        services = [

            # True tests

            (new_service(True), {}, 'enabled'),

            (new_service(True, something='whatever'),

             {'something': 'whatever'},

             "something='whatever'"),

            # False tests

            (new_service(False), {'enabled': False}, 'enabled=False'),

            (new_service(False, something='whatever'),

             {'enabled': False, 'something': 'whatever'},

             "enabled=False, something='whatever'"),

        ]

        self.downgrade(43)

        # Verify that the services have the expected values.

        self.metadata.clear()

        service_table = sqlalchemy.Table('service', self.metadata,

                                         autoload=True)

        def fetch_service(service_id):

            cols = [service_table.c.extra]

            f = service_table.c.id == service_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return json.loads(ep.extra)

        for service_id, exp_extra, msg in services:

            extra = fetch_service(service_id)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

        def add_service(enabled=True, **extra_data):

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

                'enabled': enabled

            }

            self.insert_dict(session, 'service', service)

            return service_id

        self.upgrade(44)

        # Insert some services using the new format.

        # We'll need a service entry since it's the foreign key for services.

        service_id = add_service(True)

        new_service = (lambda enabled, **extra_data:

                       add_service(enabled, **extra_data))

        # Different services with expected extra values, and a

        # description.

        services = [

            # True tests

            (new_service(True), {}, 'enabled'),

            (new_service(True, something='whatever'),

             {'something': 'whatever'},

             "something='whatever'"),

            # False tests

            (new_service(False), {'enabled': False}, 'enabled=False'),

            (new_service(False, something='whatever'),

             {'enabled': False, 'something': 'whatever'},

             "enabled=False, something='whatever'"),

        ]

        self.downgrade(43)

        # Verify that the services have the expected values.

        self.metadata.clear()

        service_table = sqlalchemy.Table('service', self.metadata,

                                         autoload=True)

**** CubicPower OpenStack Study ****

        def fetch_service(service_id):

            cols = [service_table.c.extra]

            f = service_table.c.id == service_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return json.loads(ep.extra)

        for service_id, exp_extra, msg in services:

            extra = fetch_service(service_id)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

    def test_upgrade_endpoint_enabled_cols(self):

        """Migration 42 added `enabled` column to `endpoint` table."""

        self.upgrade(42)

        # Verify that there's an 'enabled' field.

        exp_cols = ['id', 'legacy_endpoint_id', 'interface', 'region',

                    'service_id', 'url', 'extra', 'enabled']

        self.assertTableColumns('endpoint', exp_cols)

**** CubicPower OpenStack Study ****

    def test_downgrade_endpoint_enabled_cols(self):

        """Check columns when downgrade from migration 41.

        The downgrade from migration 42 removes the `enabled` column from the

        `endpoint` table.

        """

        self.upgrade(42)

        self.downgrade(41)

        exp_cols = ['id', 'legacy_endpoint_id', 'interface', 'region',

                    'service_id', 'url', 'extra']

        self.assertTableColumns('endpoint', exp_cols)

**** CubicPower OpenStack Study ****

    def test_upgrade_endpoint_enabled_data(self):

        """Migration 42 has to migrate data from `extra` to `enabled`."""

        session = self.Session()

        def add_service():

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex

            }

            self.insert_dict(session, 'service', service)

            return service_id

        def add_endpoint(service_id, **extra_data):

            endpoint_id = uuid.uuid4().hex

            endpoint = {

                'id': endpoint_id,

                'interface': uuid.uuid4().hex[:8],

                'service_id': service_id,

                'url': uuid.uuid4().hex,

                'extra': json.dumps(extra_data)

            }

            self.insert_dict(session, 'endpoint', endpoint)

            return endpoint_id

        self.upgrade(41)

        # Insert some endpoints using the old format where `enabled` is in

        # `extra` JSON.

        # We'll need a service entry since it's the foreign key for endpoints.

        service_id = add_service()

        new_ep = lambda **extra_data: add_endpoint(service_id, **extra_data)

        # Different endpoints with expected enabled and extra values, and a

        # description.

        random_attr_name = uuid.uuid4().hex

        random_attr_value = uuid.uuid4().hex

        random_attr = {random_attr_name: random_attr_value}

        random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value)

        random_attr_enabled_false = {random_attr_name: random_attr_value,

                                     'enabled': False}

        random_attr_enabled_false_str = 'enabled=False,%s' % random_attr_str

        endpoints = [

            # Some values for True.

            (new_ep(), (True, {}), 'no enabled'),

            (new_ep(enabled=True), (True, {}), 'enabled=True'),

            (new_ep(enabled='true'), (True, {}), "enabled='true'"),

            (new_ep(**random_attr),

             (True, random_attr), random_attr_str),

            (new_ep(enabled=None), (True, {}), 'enabled=None'),

            # Some values for False.

            (new_ep(enabled=False), (False, {}), 'enabled=False'),

            (new_ep(enabled='false'), (False, {}), "enabled='false'"),

            (new_ep(enabled='0'), (False, {}), "enabled='0'"),

            (new_ep(**random_attr_enabled_false),

             (False, random_attr), random_attr_enabled_false_str),

        ]

        self.upgrade(42)

        # Verify that the endpoints have the expected values.

        self.metadata.clear()

        endpoint_table = sqlalchemy.Table('endpoint', self.metadata,

                                          autoload=True)

        def fetch_endpoint(endpoint_id):

            cols = [endpoint_table.c.enabled, endpoint_table.c.extra]

            f = endpoint_table.c.id == endpoint_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return ep.enabled, json.loads(ep.extra)

        for endpoint_id, exp, msg in endpoints:

            exp_enabled, exp_extra = exp

            enabled, extra = fetch_endpoint(endpoint_id)

            self.assertIs(exp_enabled, enabled, msg)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

        def add_service():

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex

            }

            self.insert_dict(session, 'service', service)

            return service_id

**** CubicPower OpenStack Study ****

        def add_endpoint(service_id, **extra_data):

            endpoint_id = uuid.uuid4().hex

            endpoint = {

                'id': endpoint_id,

                'interface': uuid.uuid4().hex[:8],

                'service_id': service_id,

                'url': uuid.uuid4().hex,

                'extra': json.dumps(extra_data)

            }

            self.insert_dict(session, 'endpoint', endpoint)

            return endpoint_id

        self.upgrade(41)

        # Insert some endpoints using the old format where `enabled` is in

        # `extra` JSON.

        # We'll need a service entry since it's the foreign key for endpoints.

        service_id = add_service()

        new_ep = lambda **extra_data: add_endpoint(service_id, **extra_data)

        # Different endpoints with expected enabled and extra values, and a

        # description.

        random_attr_name = uuid.uuid4().hex

        random_attr_value = uuid.uuid4().hex

        random_attr = {random_attr_name: random_attr_value}

        random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value)

        random_attr_enabled_false = {random_attr_name: random_attr_value,

                                     'enabled': False}

        random_attr_enabled_false_str = 'enabled=False,%s' % random_attr_str

        endpoints = [

            # Some values for True.

            (new_ep(), (True, {}), 'no enabled'),

            (new_ep(enabled=True), (True, {}), 'enabled=True'),

            (new_ep(enabled='true'), (True, {}), "enabled='true'"),

            (new_ep(**random_attr),

             (True, random_attr), random_attr_str),

            (new_ep(enabled=None), (True, {}), 'enabled=None'),

            # Some values for False.

            (new_ep(enabled=False), (False, {}), 'enabled=False'),

            (new_ep(enabled='false'), (False, {}), "enabled='false'"),

            (new_ep(enabled='0'), (False, {}), "enabled='0'"),

            (new_ep(**random_attr_enabled_false),

             (False, random_attr), random_attr_enabled_false_str),

        ]

        self.upgrade(42)

        # Verify that the endpoints have the expected values.

        self.metadata.clear()

        endpoint_table = sqlalchemy.Table('endpoint', self.metadata,

                                          autoload=True)

**** CubicPower OpenStack Study ****

        def fetch_endpoint(endpoint_id):

            cols = [endpoint_table.c.enabled, endpoint_table.c.extra]

            f = endpoint_table.c.id == endpoint_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return ep.enabled, json.loads(ep.extra)

        for endpoint_id, exp, msg in endpoints:

            exp_enabled, exp_extra = exp

            enabled, extra = fetch_endpoint(endpoint_id)

            self.assertIs(exp_enabled, enabled, msg)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

    def test_downgrade_endpoint_enabled_data(self):

        """Downgrade from migration 42 migrates data.

        Downgrade from migration 42 migrates data from `enabled` to

        `extra`. Any disabled endpoints have 'enabled': False put into 'extra'.

        """

        session = self.Session()

        def add_service():

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex

            }

            self.insert_dict(session, 'service', service)

            return service_id

        def add_endpoint(service_id, enabled, **extra_data):

            endpoint_id = uuid.uuid4().hex

            endpoint = {

                'id': endpoint_id,

                'interface': uuid.uuid4().hex[:8],

                'service_id': service_id,

                'url': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

                'enabled': enabled

            }

            self.insert_dict(session, 'endpoint', endpoint)

            return endpoint_id

        self.upgrade(42)

        # Insert some endpoints using the new format.

        # We'll need a service entry since it's the foreign key for endpoints.

        service_id = add_service()

        new_ep = (lambda enabled, **extra_data:

                  add_endpoint(service_id, enabled, **extra_data))

        # Different endpoints with expected extra values, and a

        # description.

        endpoints = [

            # True tests

            (new_ep(True), {}, 'enabled'),

            (new_ep(True, something='whatever'), {'something': 'whatever'},

             "something='whatever'"),

            # False tests

            (new_ep(False), {'enabled': False}, 'enabled=False'),

            (new_ep(False, something='whatever'),

             {'enabled': False, 'something': 'whatever'},

             "enabled=False, something='whatever'"),

        ]

        self.downgrade(41)

        # Verify that the endpoints have the expected values.

        self.metadata.clear()

        endpoint_table = sqlalchemy.Table('endpoint', self.metadata,

                                          autoload=True)

        def fetch_endpoint(endpoint_id):

            cols = [endpoint_table.c.extra]

            f = endpoint_table.c.id == endpoint_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return json.loads(ep.extra)

        for endpoint_id, exp_extra, msg in endpoints:

            extra = fetch_endpoint(endpoint_id)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

        def add_service():

            service_id = uuid.uuid4().hex

            service = {

                'id': service_id,

                'type': uuid.uuid4().hex

            }

            self.insert_dict(session, 'service', service)

            return service_id

**** CubicPower OpenStack Study ****

        def add_endpoint(service_id, enabled, **extra_data):

            endpoint_id = uuid.uuid4().hex

            endpoint = {

                'id': endpoint_id,

                'interface': uuid.uuid4().hex[:8],

                'service_id': service_id,

                'url': uuid.uuid4().hex,

                'extra': json.dumps(extra_data),

                'enabled': enabled

            }

            self.insert_dict(session, 'endpoint', endpoint)

            return endpoint_id

        self.upgrade(42)

        # Insert some endpoints using the new format.

        # We'll need a service entry since it's the foreign key for endpoints.

        service_id = add_service()

        new_ep = (lambda enabled, **extra_data:

                  add_endpoint(service_id, enabled, **extra_data))

        # Different endpoints with expected extra values, and a

        # description.

        endpoints = [

            # True tests

            (new_ep(True), {}, 'enabled'),

            (new_ep(True, something='whatever'), {'something': 'whatever'},

             "something='whatever'"),

            # False tests

            (new_ep(False), {'enabled': False}, 'enabled=False'),

            (new_ep(False, something='whatever'),

             {'enabled': False, 'something': 'whatever'},

             "enabled=False, something='whatever'"),

        ]

        self.downgrade(41)

        # Verify that the endpoints have the expected values.

        self.metadata.clear()

        endpoint_table = sqlalchemy.Table('endpoint', self.metadata,

                                          autoload=True)

**** CubicPower OpenStack Study ****

        def fetch_endpoint(endpoint_id):

            cols = [endpoint_table.c.extra]

            f = endpoint_table.c.id == endpoint_id

            s = sqlalchemy.select(cols).where(f)

            ep = session.execute(s).fetchone()

            return json.loads(ep.extra)

        for endpoint_id, exp_extra, msg in endpoints:

            extra = fetch_endpoint(endpoint_id)

            self.assertEqual(exp_extra, extra, msg)

**** CubicPower OpenStack Study ****

    def test_upgrade_region_non_unique_description(self):

        """Test upgrade to migration 43.

        This migration should occur with no unique constraint on the region

        description column.

        Create two regions with the same description.

        """

        session = self.Session()

        def add_region():

            region_uuid = uuid.uuid4().hex

            region = {

                'id': region_uuid,

                'description': ''

            }

            self.insert_dict(session, 'region', region)

            return region_uuid

        self.upgrade(43)

        # Write one region to the database

        add_region()

        # Write another region to the database with the same description

        add_region()

**** CubicPower OpenStack Study ****

        def add_region():

            region_uuid = uuid.uuid4().hex

            region = {

                'id': region_uuid,

                'description': ''

            }

            self.insert_dict(session, 'region', region)

            return region_uuid

        self.upgrade(43)

        # Write one region to the database

        add_region()

        # Write another region to the database with the same description

        add_region()

**** CubicPower OpenStack Study ****

    def test_upgrade_region_unique_description(self):

        """Test upgrade to migration 43.

        This test models a migration where there is a unique constraint on the

        description column.

        Create two regions with the same description.

        """

        session = self.Session()

        def add_region(table):

            region_uuid = uuid.uuid4().hex

            region = {

                'id': region_uuid,

                'description': ''

            }

            self.insert_dict(session, 'region', region, table=table)

            return region_uuid

        def get_metadata():

            meta = sqlalchemy.MetaData()

            meta.bind = self.engine

            return meta

        # Migrate to version 42

        self.upgrade(42)

        region_table = sqlalchemy.Table('region',

                                        get_metadata(),

                                        autoload=True)

        # create the unique constraint and load the new version of the

        # reflection cache

        idx = sqlalchemy.Index('description', region_table.c.description,

                               unique=True)

        idx.create(self.engine)

        region_unique_table = sqlalchemy.Table('region',

                                               get_metadata(),

                                               autoload=True)

        add_region(region_unique_table)

        self.assertEqual(1, session.query(region_unique_table).count())

        # verify the unique constraint is enforced

        self.assertRaises(sqlalchemy.exc.IntegrityError,

                          add_region,

                          table=region_unique_table)

        # migrate to 43, unique constraint should be dropped

        self.upgrade(43)

        # reload the region table from the schema

        region_nonunique = sqlalchemy.Table('region',

                                            get_metadata(),

                                            autoload=True)

        self.assertEqual(1, session.query(region_nonunique).count())

        # Write a second region to the database with the same description

        add_region(region_nonunique)

        self.assertEqual(2, session.query(region_nonunique).count())

**** CubicPower OpenStack Study ****

        def add_region(table):

            region_uuid = uuid.uuid4().hex

            region = {

                'id': region_uuid,

                'description': ''

            }

            self.insert_dict(session, 'region', region, table=table)

            return region_uuid

**** CubicPower OpenStack Study ****

        def get_metadata():

            meta = sqlalchemy.MetaData()

            meta.bind = self.engine

            return meta

        # Migrate to version 42

        self.upgrade(42)

        region_table = sqlalchemy.Table('region',

                                        get_metadata(),

                                        autoload=True)

        # create the unique constraint and load the new version of the

        # reflection cache

        idx = sqlalchemy.Index('description', region_table.c.description,

                               unique=True)

        idx.create(self.engine)

        region_unique_table = sqlalchemy.Table('region',

                                               get_metadata(),

                                               autoload=True)

        add_region(region_unique_table)

        self.assertEqual(1, session.query(region_unique_table).count())

        # verify the unique constraint is enforced

        self.assertRaises(sqlalchemy.exc.IntegrityError,

                          add_region,

                          table=region_unique_table)

        # migrate to 43, unique constraint should be dropped

        self.upgrade(43)

        # reload the region table from the schema

        region_nonunique = sqlalchemy.Table('region',

                                            get_metadata(),

                                            autoload=True)

        self.assertEqual(1, session.query(region_nonunique).count())

        # Write a second region to the database with the same description

        add_region(region_nonunique)

        self.assertEqual(2, session.query(region_nonunique).count())

**** CubicPower OpenStack Study ****

    def populate_user_table(self, with_pass_enab=False,

                            with_pass_enab_domain=False):

        # Populate the appropriate fields in the user

        # table, depending on the parameters:

        #

        # Default: id, name, extra

        # pass_enab: Add password, enabled as well

        # pass_enab_domain: Add password, enabled and domain as well

        #

        this_table = sqlalchemy.Table("user",

                                      self.metadata,

                                      autoload=True)

        for user in default_fixtures.USERS:

            extra = copy.deepcopy(user)

            extra.pop('id')

            extra.pop('name')

            if with_pass_enab:

                password = extra.pop('password', None)

                enabled = extra.pop('enabled', True)

                ins = this_table.insert().values(

                    {'id': user['id'],

                     'name': user['name'],

                     'password': password,

                     'enabled': bool(enabled),

                     'extra': json.dumps(extra)})

            else:

                if with_pass_enab_domain:

                    password = extra.pop('password', None)

                    enabled = extra.pop('enabled', True)

                    extra.pop('domain_id')

                    ins = this_table.insert().values(

                        {'id': user['id'],

                         'name': user['name'],

                         'domain_id': user['domain_id'],

                         'password': password,

                         'enabled': bool(enabled),

                         'extra': json.dumps(extra)})

                else:

                    ins = this_table.insert().values(

                        {'id': user['id'],

                         'name': user['name'],

                         'extra': json.dumps(extra)})

            self.engine.execute(ins)

**** CubicPower OpenStack Study ****

    def populate_tenant_table(self, with_desc_enab=False,

                              with_desc_enab_domain=False):

        # Populate the appropriate fields in the tenant or

        # project table, depending on the parameters

        #

        # Default: id, name, extra

        # desc_enab: Add description, enabled as well

        # desc_enab_domain: Add description, enabled and domain as well,

        #                   plus use project instead of tenant

        #

        if with_desc_enab_domain:

            # By this time tenants are now projects

            this_table = sqlalchemy.Table("project",

                                          self.metadata,

                                          autoload=True)

        else:

            this_table = sqlalchemy.Table("tenant",

                                          self.metadata,

                                          autoload=True)

        for tenant in default_fixtures.TENANTS:

            extra = copy.deepcopy(tenant)

            extra.pop('id')

            extra.pop('name')

            if with_desc_enab:

                desc = extra.pop('description', None)

                enabled = extra.pop('enabled', True)

                ins = this_table.insert().values(

                    {'id': tenant['id'],

                     'name': tenant['name'],

                     'description': desc,

                     'enabled': bool(enabled),

                     'extra': json.dumps(extra)})

            else:

                if with_desc_enab_domain:

                    desc = extra.pop('description', None)

                    enabled = extra.pop('enabled', True)

                    extra.pop('domain_id')

                    ins = this_table.insert().values(

                        {'id': tenant['id'],

                         'name': tenant['name'],

                         'domain_id': tenant['domain_id'],

                         'description': desc,

                         'enabled': bool(enabled),

                         'extra': json.dumps(extra)})

                else:

                    ins = this_table.insert().values(

                        {'id': tenant['id'],

                         'name': tenant['name'],

                         'extra': json.dumps(extra)})

            self.engine.execute(ins)

**** CubicPower OpenStack Study ****

    def _mysql_check_all_tables_innodb(self):

        database = self.engine.url.database

        connection = self.engine.connect()

        # sanity check

        total = connection.execute("SELECT count(*) "

                                   "from information_schema.TABLES "

                                   "where TABLE_SCHEMA='%(database)s'" %

                                   dict(database=database))

        self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")

        noninnodb = connection.execute("SELECT table_name "

                                       "from information_schema.TABLES "

                                       "where TABLE_SCHEMA='%(database)s' "

                                       "and ENGINE!='InnoDB' "

                                       "and TABLE_NAME!='migrate_version'" %

                                       dict(database=database))

        names = [x[0] for x in noninnodb]

        self.assertEqual(names, [],

                         "Non-InnoDB tables exist")

        connection.close()

**** CubicPower OpenStack Study ****

class VersionTests(SqlMigrateBase):

**** CubicPower OpenStack Study ****

    def test_core_initial(self):

        """When get the version before migrated, it's 0."""

        version = migration_helpers.get_db_version()

        self.assertEqual(0, version)

**** CubicPower OpenStack Study ****

    def test_core_max(self):

        """When get the version after upgrading, it's the new version."""

        self.upgrade(self.max_version)

        version = migration_helpers.get_db_version()

        self.assertEqual(self.max_version, version)

**** CubicPower OpenStack Study ****

    def test_extension_not_controlled(self):

        """When get the version before controlling, raises DbMigrationError."""

        self.assertRaises(db_exception.DbMigrationError,

                          migration_helpers.get_db_version,

                          extension='federation')

**** CubicPower OpenStack Study ****

    def test_extension_initial(self):

        """When get the initial version of an extension, it's 0."""

        abs_path = migration_helpers.find_migrate_repo(federation)

        migration.db_version_control(sql.get_engine(), abs_path)

        version = migration_helpers.get_db_version(extension='federation')

        self.assertEqual(0, version)

**** CubicPower OpenStack Study ****

    def test_extension_migrated(self):

        """When get the version after migrating an extension, it's not 0."""

        abs_path = migration_helpers.find_migrate_repo(federation)

        migration.db_version_control(sql.get_engine(), abs_path)

        migration.db_sync(sql.get_engine(), abs_path)

        version = migration_helpers.get_db_version(extension='federation')

        self.assertTrue(version > 0, "Version didn't change after migrated?")

**** CubicPower OpenStack Study ****

    def test_unexpected_extension(self):

        """The version for an extension that doesn't exist raises ImportError.

        """

        extension_name = uuid.uuid4().hex

        self.assertRaises(ImportError,

                          migration_helpers.get_db_version,

                          extension=extension_name)

**** CubicPower OpenStack Study ****

    def test_unversioned_extension(self):

        """The version for extensions without migrations raise an exception.

        """

        self.assertRaises(exception.MigrationNotProvided,

                          migration_helpers.get_db_version,

                          extension='access')