¡@

Home 

OpenStack Study: session.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2010 United States Government as represented by the

# Administrator of the National Aeronautics and Space Administration.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""Session Handling for SQLAlchemy backend.

Initializing:

* Call `set_defaults()` with the minimal of the following kwargs:

``sql_connection``, ``sqlite_db``

Example:

.. code:: python

session.set_defaults(

sql_connection="sqlite:///var/lib/neutron/sqlite.db",

sqlite_db="/var/lib/neutron/sqlite.db")

Recommended ways to use sessions within this framework:

* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.

`model_query()` will implicitly use a session when called without one

supplied. This is the ideal situation because it will allow queries

to be automatically retried if the database connection is interrupted.

.. note:: Automatic retry will be enabled in a future patch.

It is generally fine to issue several queries in a row like this. Even though

they may be run in separate transactions and/or separate sessions, each one

will see the data from the prior calls. If needed, undo- or rollback-like

functionality should be handled at a logical level. For an example, look at

the code around quotas and `reservation_rollback()`.

Examples:

.. code:: python

**** CubicPower OpenStack Study ****

def set_defaults(sql_connection, sqlite_db, max_pool_size=None,

                 max_overflow=None, pool_timeout=None):

    """Set defaults for configuration variables."""

    cfg.set_defaults(database_opts,

                     connection=sql_connection)

    cfg.set_defaults(sqlite_db_opts,

                     sqlite_db=sqlite_db)

    # Update the QueuePool defaults

    if max_pool_size is not None:

        cfg.set_defaults(database_opts,

                         max_pool_size=max_pool_size)

    if max_overflow is not None:

        cfg.set_defaults(database_opts,

                         max_overflow=max_overflow)

    if pool_timeout is not None:

        cfg.set_defaults(database_opts,

                         pool_timeout=pool_timeout)

**** CubicPower OpenStack Study ****

def cleanup():

    global _ENGINE, _MAKER

    global _SLAVE_ENGINE, _SLAVE_MAKER

    if _MAKER:

        _MAKER.close_all()

        _MAKER = None

    if _ENGINE:

        _ENGINE.dispose()

        _ENGINE = None

    if _SLAVE_MAKER:

        _SLAVE_MAKER.close_all()

        _SLAVE_MAKER = None

    if _SLAVE_ENGINE:

        _SLAVE_ENGINE.dispose()

        _SLAVE_ENGINE = None

**** CubicPower OpenStack Study ****

class SqliteForeignKeysListener(PoolListener):

"""Ensures that the foreign key constraints are enforced in SQLite.

The foreign key constraints are disabled by

**** CubicPower OpenStack Study ****

    def connect(self, dbapi_con, con_record):

        dbapi_con.execute('pragma foreign_keys=ON')

def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,

                slave_session=False, mysql_traditional_mode=False):

    """Return a SQLAlchemy session."""

    global _MAKER

    global _SLAVE_MAKER

    maker = _MAKER

    if slave_session:

        maker = _SLAVE_MAKER

    if maker is None:

        engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,

                            mysql_traditional_mode=mysql_traditional_mode)

        maker = get_maker(engine, autocommit, expire_on_commit)

    if slave_session:

        _SLAVE_MAKER = maker

    else:

        _MAKER = maker

    session = maker()

    return session

# note(boris-42): In current versions of DB backends unique constraint

# violation messages follow the structure:

#

# sqlite:

# 1 column - (IntegrityError) column c1 is not unique

# N columns - (IntegrityError) column c1, c2, ..., N are not unique

#

# sqlite since 3.7.16:

# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1

#

# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2

#

# postgres:

# 1 column - (IntegrityError) duplicate key value violates unique

#               constraint "users_c1_key"

# N columns - (IntegrityError) duplicate key value violates unique

#               constraint "name_of_our_constraint"

#

# mysql:

# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key

#               'c1'")

# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined

#               with -' for key 'name_of_our_constraint'")

_DUP_KEY_RE_DB = {

    "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),

               re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),

    "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),

    "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)

}

def _raise_if_duplicate_entry_error(integrity_error, engine_name):

    """Raise exception if two entries are duplicated.

    In this function will be raised DBDuplicateEntry exception if integrity

    error wrap unique constraint violation.

    """

**** CubicPower OpenStack Study ****

def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,

                slave_session=False, mysql_traditional_mode=False):

    """Return a SQLAlchemy session."""

    global _MAKER

    global _SLAVE_MAKER

    maker = _MAKER

    if slave_session:

        maker = _SLAVE_MAKER

    if maker is None:

        engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,

                            mysql_traditional_mode=mysql_traditional_mode)

        maker = get_maker(engine, autocommit, expire_on_commit)

    if slave_session:

        _SLAVE_MAKER = maker

    else:

        _MAKER = maker

    session = maker()

    return session

# note(boris-42): In current versions of DB backends unique constraint

# violation messages follow the structure:

#

# sqlite:

# 1 column - (IntegrityError) column c1 is not unique

# N columns - (IntegrityError) column c1, c2, ..., N are not unique

#

# sqlite since 3.7.16:

# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1

#

# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2

#

# postgres:

# 1 column - (IntegrityError) duplicate key value violates unique

#               constraint "users_c1_key"

# N columns - (IntegrityError) duplicate key value violates unique

#               constraint "name_of_our_constraint"

#

# mysql:

# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key

#               'c1'")

# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined

#               with -' for key 'name_of_our_constraint'")

_DUP_KEY_RE_DB = {

    "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),

               re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),

    "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),

    "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)

}

**** CubicPower OpenStack Study ****

def _raise_if_duplicate_entry_error(integrity_error, engine_name):

    """Raise exception if two entries are duplicated.

    In this function will be raised DBDuplicateEntry exception if integrity

    error wrap unique constraint violation.

    """

**** CubicPower OpenStack Study ****

    def get_columns_from_uniq_cons_or_name(columns):

        # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"

        #                  where `t` it is table name and columns `c1`, `c2`

        #                  are in UniqueConstraint.

        uniqbase = "uniq_"

        if not columns.startswith(uniqbase):

            if engine_name == "postgresql":

                return [columns[columns.index("_") + 1:columns.rindex("_")]]

            return [columns]

        return columns[len(uniqbase):].split("0")[1:]

    if engine_name not in ["mysql", "sqlite", "postgresql"]:

        return

    # FIXME(johannes): The usage of the .message attribute has been

    # deprecated since Python 2.6. However, the exceptions raised by

    # SQLAlchemy can differ when using unicode() and accessing .message.

    # An audit across all three supported engines will be necessary to

    # ensure there are no regressions.

    for pattern in _DUP_KEY_RE_DB[engine_name]:

        match = pattern.match(integrity_error.message)

        if match:

            break

    else:

        return

    columns = match.group(1)

    if engine_name == "sqlite":

        columns = [c.split('.')[-1] for c in columns.strip().split(", ")]

    else:

        columns = get_columns_from_uniq_cons_or_name(columns)

    raise exception.DBDuplicateEntry(columns, integrity_error)

# NOTE(comstud): In current versions of DB backends, Deadlock violation

# messages follow the structure:

#

# mysql:

# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '

#                     'restarting transaction')  

_DEADLOCK_RE_DB = {

    "mysql": re.compile(r"^.*\(1213, 'Deadlock.*")

}

def _raise_if_deadlock_error(operational_error, engine_name):

    """Raise exception on deadlock condition.

    Raise DBDeadlock exception if OperationalError contains a Deadlock

    condition.

    """

    re = _DEADLOCK_RE_DB.get(engine_name)

    if re is None:

        return

    # FIXME(johannes): The usage of the .message attribute has been

    # deprecated since Python 2.6. However, the exceptions raised by

    # SQLAlchemy can differ when using unicode() and accessing .message.

    # An audit across all three supported engines will be necessary to

    # ensure there are no regressions.

    m = re.match(operational_error.message)

    if not m:

        return

    raise exception.DBDeadlock(operational_error)

def _wrap_db_error(f):

    @functools.wraps(f)

**** CubicPower OpenStack Study ****

def _raise_if_deadlock_error(operational_error, engine_name):

    """Raise exception on deadlock condition.

    Raise DBDeadlock exception if OperationalError contains a Deadlock

    condition.

    """

    re = _DEADLOCK_RE_DB.get(engine_name)

    if re is None:

        return

    # FIXME(johannes): The usage of the .message attribute has been

    # deprecated since Python 2.6. However, the exceptions raised by

    # SQLAlchemy can differ when using unicode() and accessing .message.

    # An audit across all three supported engines will be necessary to

    # ensure there are no regressions.

    m = re.match(operational_error.message)

    if not m:

        return

    raise exception.DBDeadlock(operational_error)

**** CubicPower OpenStack Study ****

def _wrap_db_error(f):

    @functools.wraps(f)

**** CubicPower OpenStack Study ****

    def _wrap(*args, **kwargs):

        try:

            return f(*args, **kwargs)

        except UnicodeEncodeError:

            raise exception.DBInvalidUnicodeParameter()

        except sqla_exc.OperationalError as e:

            _raise_if_db_connection_lost(e, get_engine())

            _raise_if_deadlock_error(e, get_engine().name)

            # NOTE(comstud): A lot of code is checking for OperationalError

            # so let's not wrap it for now.

            raise

        # note(boris-42): We should catch unique constraint violation and

        # wrap it by our own DBDuplicateEntry exception. Unique constraint

        # violation is wrapped by IntegrityError.

        except sqla_exc.IntegrityError as e:

            # note(boris-42): SqlAlchemy doesn't unify errors from different

            # DBs so we must do this. Also in some tables (for example

            # instance_types) there are more than one unique constraint. This

            # means we should get names of columns, which values violate

            # unique constraint, from error message.

            _raise_if_duplicate_entry_error(e, get_engine().name)

            raise exception.DBError(e)

        except Exception as e:

            LOG.exception(_('DB exception wrapped.'))

            raise exception.DBError(e)

    return _wrap

def get_engine(sqlite_fk=False, slave_engine=False,

               mysql_traditional_mode=False):

    """Return a SQLAlchemy engine."""

    global _ENGINE

    global _SLAVE_ENGINE

    engine = _ENGINE

    db_uri = CONF.database.connection

    if slave_engine:

        engine = _SLAVE_ENGINE

        db_uri = CONF.database.slave_connection

    if engine is None:

        engine = create_engine(db_uri, sqlite_fk=sqlite_fk,

                               mysql_traditional_mode=mysql_traditional_mode)

    if slave_engine:

        _SLAVE_ENGINE = engine

    else:

        _ENGINE = engine

    return engine

def _synchronous_switch_listener(dbapi_conn, connection_rec):

    """Switch sqlite connections to non-synchronous mode."""

    dbapi_conn.execute("PRAGMA synchronous = OFF")

def _add_regexp_listener(dbapi_con, con_record):

    """Add REGEXP function to sqlite connections."""

**** CubicPower OpenStack Study ****

def get_engine(sqlite_fk=False, slave_engine=False,

               mysql_traditional_mode=False):

    """Return a SQLAlchemy engine."""

    global _ENGINE

    global _SLAVE_ENGINE

    engine = _ENGINE

    db_uri = CONF.database.connection

    if slave_engine:

        engine = _SLAVE_ENGINE

        db_uri = CONF.database.slave_connection

    if engine is None:

        engine = create_engine(db_uri, sqlite_fk=sqlite_fk,

                               mysql_traditional_mode=mysql_traditional_mode)

    if slave_engine:

        _SLAVE_ENGINE = engine

    else:

        _ENGINE = engine

    return engine

**** CubicPower OpenStack Study ****

def _synchronous_switch_listener(dbapi_conn, connection_rec):

    """Switch sqlite connections to non-synchronous mode."""

    dbapi_conn.execute("PRAGMA synchronous = OFF")

**** CubicPower OpenStack Study ****

def _add_regexp_listener(dbapi_con, con_record):

    """Add REGEXP function to sqlite connections."""

**** CubicPower OpenStack Study ****

    def regexp(expr, item):

        reg = re.compile(expr)

        return reg.search(six.text_type(item)) is not None

    dbapi_con.create_function('regexp', 2, regexp)

def _thread_yield(dbapi_con, con_record):

    """Ensure other greenthreads get a chance to be executed.

    If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will

    execute instead of time.sleep(0).

    Force a context switch. With common database backends (eg MySQLdb and

    sqlite), there is no implicit yield caused by network I/O since they are

    implemented by C libraries that eventlet cannot monkey patch.

    """

    time.sleep(0)

def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):

    """Ensures that MySQL and DB2 connections are alive.

    Borrowed from:

    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f

    """

    cursor = dbapi_conn.cursor()

    try:

        ping_sql = 'select 1'

        if engine.name == 'ibm_db_sa':

            # DB2 requires a table expression

            ping_sql = 'select 1 from (values (1)) AS t1'

        cursor.execute(ping_sql)

    except Exception as ex:

        if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):

            msg = _('Database server has gone away: %s') % ex

            LOG.warning(msg)

            raise sqla_exc.DisconnectionError(msg)

        else:

            raise

def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):

    """Set engine mode to 'traditional'.

    Required to prevent silent truncates at insert or update operations

    under MySQL. By default MySQL truncates inserted string if it longer

    than a declared field just with warning. That is fraught with data

    corruption.

    """

    dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")

def _is_db_connection_error(args):

    """Return True if error in connecting to db."""

    # NOTE(adam_g): This is currently MySQL specific and needs to be extended

    #               to support Postgres and others.

    # For the db2, the error code is -30081 since the db2 is still not ready

    conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')

    for err_code in conn_err_codes:

        if args.find(err_code) != -1:

            return True

    return False

def _raise_if_db_connection_lost(error, engine):

    # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)

    #                  requires connection and cursor in incoming parameters,

    #                  but we have no possibility to create connection if DB

    #                  is not available, so in such case reconnect fails.

    #                  But is_disconnect() ignores these parameters, so it

    #                  makes sense to pass to function None as placeholder

    #                  instead of connection and cursor.

    if engine.dialect.is_disconnect(error, None, None):

        raise exception.DBConnectionError(error)

def create_engine(sql_connection, sqlite_fk=False,

                  mysql_traditional_mode=False):

    """Return a new SQLAlchemy engine."""

    # NOTE(geekinutah): At this point we could be connecting to the normal

    #                   db handle or the slave db handle. Things like

    #                   _wrap_db_error aren't going to work well if their

    #                   backends don't match. Let's check.

    _assert_matching_drivers()

    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)

    engine_args = {

        "pool_recycle": CONF.database.idle_timeout,

        "echo": False,

        'convert_unicode': True,

    }

    # Map our SQL debug level to SQLAlchemy's options

    if CONF.database.connection_debug >= 100:

        engine_args['echo'] = 'debug'

    elif CONF.database.connection_debug >= 50:

        engine_args['echo'] = True

    if "sqlite" in connection_dict.drivername:

        if sqlite_fk:

            engine_args["listeners"] = [SqliteForeignKeysListener()]

        engine_args["poolclass"] = NullPool

        if CONF.database.connection == "sqlite://":

            engine_args["poolclass"] = StaticPool

            engine_args["connect_args"] = {'check_same_thread': False}

    else:

        if CONF.database.max_pool_size is not None:

            engine_args['pool_size'] = CONF.database.max_pool_size

        if CONF.database.max_overflow is not None:

            engine_args['max_overflow'] = CONF.database.max_overflow

        if CONF.database.pool_timeout is not None:

            engine_args['pool_timeout'] = CONF.database.pool_timeout

    engine = sqlalchemy.create_engine(sql_connection, **engine_args)

    sqlalchemy.event.listen(engine, 'checkin', _thread_yield)

    if engine.name in ['mysql', 'ibm_db_sa']:

        callback = functools.partial(_ping_listener, engine)

        sqlalchemy.event.listen(engine, 'checkout', callback)

        if engine.name == 'mysql':

            if mysql_traditional_mode:

                sqlalchemy.event.listen(engine, 'checkout',

                                        _set_mode_traditional)

            else:

                LOG.warning(_("This application has not enabled MySQL "

                              "traditional mode, which means silent "

                              "data corruption may occur. "

                              "Please encourage the application "

                              "developers to enable this mode."))

    elif 'sqlite' in connection_dict.drivername:

        if not CONF.sqlite_synchronous:

            sqlalchemy.event.listen(engine, 'connect',

                                    _synchronous_switch_listener)

        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)

    if (CONF.database.connection_trace and

            engine.dialect.dbapi.__name__ == 'MySQLdb'):

        _patch_mysqldb_with_stacktrace_comments()

    try:

        engine.connect()

    except sqla_exc.OperationalError as e:

        if not _is_db_connection_error(e.args[0]):

            raise

        remaining = CONF.database.max_retries

        if remaining == -1:

            remaining = 'infinite'

        while True:

            msg = _('SQL connection failed. %s attempts left.')

            LOG.warning(msg % remaining)

            if remaining != 'infinite':

                remaining -= 1

            time.sleep(CONF.database.retry_interval)

            try:

                engine.connect()

                break

            except sqla_exc.OperationalError as e:

                if (remaining != 'infinite' and remaining == 0) or \

                        not _is_db_connection_error(e.args[0]):

                    raise

    return engine

**** CubicPower OpenStack Study ****

def _thread_yield(dbapi_con, con_record):

    """Ensure other greenthreads get a chance to be executed.

    If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will

    execute instead of time.sleep(0).

    Force a context switch. With common database backends (eg MySQLdb and

    sqlite), there is no implicit yield caused by network I/O since they are

    implemented by C libraries that eventlet cannot monkey patch.

    """

    time.sleep(0)

**** CubicPower OpenStack Study ****

def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):

    """Ensures that MySQL and DB2 connections are alive.

    Borrowed from:

    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f

    """

    cursor = dbapi_conn.cursor()

    try:

        ping_sql = 'select 1'

        if engine.name == 'ibm_db_sa':

            # DB2 requires a table expression

            ping_sql = 'select 1 from (values (1)) AS t1'

        cursor.execute(ping_sql)

    except Exception as ex:

        if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):

            msg = _('Database server has gone away: %s') % ex

            LOG.warning(msg)

            raise sqla_exc.DisconnectionError(msg)

        else:

            raise

**** CubicPower OpenStack Study ****

def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):

    """Set engine mode to 'traditional'.

    Required to prevent silent truncates at insert or update operations

    under MySQL. By default MySQL truncates inserted string if it longer

    than a declared field just with warning. That is fraught with data

    corruption.

    """

    dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")

**** CubicPower OpenStack Study ****

def _is_db_connection_error(args):

    """Return True if error in connecting to db."""

    # NOTE(adam_g): This is currently MySQL specific and needs to be extended

    #               to support Postgres and others.

    # For the db2, the error code is -30081 since the db2 is still not ready

    conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')

    for err_code in conn_err_codes:

        if args.find(err_code) != -1:

            return True

    return False

**** CubicPower OpenStack Study ****

def _raise_if_db_connection_lost(error, engine):

    # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)

    #                  requires connection and cursor in incoming parameters,

    #                  but we have no possibility to create connection if DB

    #                  is not available, so in such case reconnect fails.

    #                  But is_disconnect() ignores these parameters, so it

    #                  makes sense to pass to function None as placeholder

    #                  instead of connection and cursor.

    if engine.dialect.is_disconnect(error, None, None):

        raise exception.DBConnectionError(error)

**** CubicPower OpenStack Study ****

def create_engine(sql_connection, sqlite_fk=False,

                  mysql_traditional_mode=False):

    """Return a new SQLAlchemy engine."""

    # NOTE(geekinutah): At this point we could be connecting to the normal

    #                   db handle or the slave db handle. Things like

    #                   _wrap_db_error aren't going to work well if their

    #                   backends don't match. Let's check.

    _assert_matching_drivers()

    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)

    engine_args = {

        "pool_recycle": CONF.database.idle_timeout,

        "echo": False,

        'convert_unicode': True,

    }

    # Map our SQL debug level to SQLAlchemy's options

    if CONF.database.connection_debug >= 100:

        engine_args['echo'] = 'debug'

    elif CONF.database.connection_debug >= 50:

        engine_args['echo'] = True

    if "sqlite" in connection_dict.drivername:

        if sqlite_fk:

            engine_args["listeners"] = [SqliteForeignKeysListener()]

        engine_args["poolclass"] = NullPool

        if CONF.database.connection == "sqlite://":

            engine_args["poolclass"] = StaticPool

            engine_args["connect_args"] = {'check_same_thread': False}

    else:

        if CONF.database.max_pool_size is not None:

            engine_args['pool_size'] = CONF.database.max_pool_size

        if CONF.database.max_overflow is not None:

            engine_args['max_overflow'] = CONF.database.max_overflow

        if CONF.database.pool_timeout is not None:

            engine_args['pool_timeout'] = CONF.database.pool_timeout

    engine = sqlalchemy.create_engine(sql_connection, **engine_args)

    sqlalchemy.event.listen(engine, 'checkin', _thread_yield)

    if engine.name in ['mysql', 'ibm_db_sa']:

        callback = functools.partial(_ping_listener, engine)

        sqlalchemy.event.listen(engine, 'checkout', callback)

        if engine.name == 'mysql':

            if mysql_traditional_mode:

                sqlalchemy.event.listen(engine, 'checkout',

                                        _set_mode_traditional)

            else:

                LOG.warning(_("This application has not enabled MySQL "

                              "traditional mode, which means silent "

                              "data corruption may occur. "

                              "Please encourage the application "

                              "developers to enable this mode."))

    elif 'sqlite' in connection_dict.drivername:

        if not CONF.sqlite_synchronous:

            sqlalchemy.event.listen(engine, 'connect',

                                    _synchronous_switch_listener)

        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)

    if (CONF.database.connection_trace and

            engine.dialect.dbapi.__name__ == 'MySQLdb'):

        _patch_mysqldb_with_stacktrace_comments()

    try:

        engine.connect()

    except sqla_exc.OperationalError as e:

        if not _is_db_connection_error(e.args[0]):

            raise

        remaining = CONF.database.max_retries

        if remaining == -1:

            remaining = 'infinite'

        while True:

            msg = _('SQL connection failed. %s attempts left.')

            LOG.warning(msg % remaining)

            if remaining != 'infinite':

                remaining -= 1

            time.sleep(CONF.database.retry_interval)

            try:

                engine.connect()

                break

            except sqla_exc.OperationalError as e:

                if (remaining != 'infinite' and remaining == 0) or \

                        not _is_db_connection_error(e.args[0]):

                    raise

    return engine

**** CubicPower OpenStack Study ****

class Query(sqlalchemy.orm.query.Query):

"""Subclass of sqlalchemy.query with soft_delete() method."""

**** CubicPower OpenStack Study ****

    def soft_delete(self, synchronize_session='evaluate'):

        return self.update({'deleted': literal_column('id'),

                            'updated_at': literal_column('updated_at'),

                            'deleted_at': timeutils.utcnow()},

                           synchronize_session=synchronize_session)

**** CubicPower OpenStack Study ****

class Session(sqlalchemy.orm.session.Session):

"""Custom Session class to avoid SqlAlchemy Session monkey patching."""

@_wrap_db_error

**** CubicPower OpenStack Study ****

    def query(self, *args, **kwargs):

        return super(Session, self).query(*args, **kwargs)

    @_wrap_db_error

**** CubicPower OpenStack Study ****

    def flush(self, *args, **kwargs):

        return super(Session, self).flush(*args, **kwargs)

    @_wrap_db_error

**** CubicPower OpenStack Study ****

    def execute(self, *args, **kwargs):

        return super(Session, self).execute(*args, **kwargs)

def get_maker(engine, autocommit=True, expire_on_commit=False):

    """Return a SQLAlchemy sessionmaker using the given engine."""

    return sqlalchemy.orm.sessionmaker(bind=engine,

                                       class_=Session,

                                       autocommit=autocommit,

                                       expire_on_commit=expire_on_commit,

                                       query_cls=Query)

def _patch_mysqldb_with_stacktrace_comments():

    """Adds current stack trace as a comment in queries.

    Patches MySQLdb.cursors.BaseCursor._do_query.

    """

    import MySQLdb.cursors

    import traceback

    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query

**** CubicPower OpenStack Study ****

def get_maker(engine, autocommit=True, expire_on_commit=False):

    """Return a SQLAlchemy sessionmaker using the given engine."""

    return sqlalchemy.orm.sessionmaker(bind=engine,

                                       class_=Session,

                                       autocommit=autocommit,

                                       expire_on_commit=expire_on_commit,

                                       query_cls=Query)

**** CubicPower OpenStack Study ****

def _patch_mysqldb_with_stacktrace_comments():

    """Adds current stack trace as a comment in queries.

    Patches MySQLdb.cursors.BaseCursor._do_query.

    """

    import MySQLdb.cursors

    import traceback

    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query

**** CubicPower OpenStack Study ****

    def _do_query(self, q):

        stack = ''

        for filename, line, method, function in traceback.extract_stack():

            # exclude various common things from trace

            if filename.endswith('session.py') and method == '_do_query':

                continue

            if filename.endswith('api.py') and method == 'wrapper':

                continue

            if filename.endswith('utils.py') and method == '_inner':

                continue

            if filename.endswith('exception.py') and method == '_wrap':

                continue

            # db/api is just a wrapper around db/sqlalchemy/api

            if filename.endswith('db/api.py'):

                continue

            # only trace inside neutron

            index = filename.rfind('neutron')

            if index == -1:

                continue

            stack += "File:%s:%s Method:%s() Line:%s | " \

                     % (filename[index:], line, method, function)

        # strip trailing " | " from stack

        if stack:

            stack = stack[:-3]

            qq = "%s /* %s */" % (q, stack)

        else:

            qq = q

        old_mysql_do_query(self, qq)

    setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)

def _assert_matching_drivers():

    """Make sure slave handle and normal handle have the same driver."""

    # NOTE(geekinutah): There's no use case for writing to one backend and

    #                 reading from another. Who knows what the future holds?

    if CONF.database.slave_connection == '':

        return

    normal = sqlalchemy.engine.url.make_url(CONF.database.connection)

    slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection)

    assert normal.drivername == slave.drivername

**** CubicPower OpenStack Study ****

def _assert_matching_drivers():

    """Make sure slave handle and normal handle have the same driver."""

    # NOTE(geekinutah): There's no use case for writing to one backend and

    #                 reading from another. Who knows what the future holds?

    if CONF.database.slave_connection == '':

        return

    normal = sqlalchemy.engine.url.make_url(CONF.database.connection)

    slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection)

    assert normal.drivername == slave.drivername