_DEADLOCK_RE_DB = {
    "mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
}
def _raise_if_deadlock_error(operational_error, engine_name):
    """
    Raise DBDeadlock exception if OperationalError contains a Deadlock
    condition.
    """
    re = _DEADLOCK_RE_DB.get(engine_name)
    if re is None:
        return
    m = re.match(operational_error.message)
    if not m:
        return
    raise exception.DBDeadlock(operational_error)
def _wrap_db_error(f):
**** CubicPower OpenStack Study ****
def _raise_if_deadlock_error(operational_error, engine_name):
    """
    Raise DBDeadlock exception if OperationalError contains a Deadlock
    condition.
    """
    re = _DEADLOCK_RE_DB.get(engine_name)
    if re is None:
        return
    m = re.match(operational_error.message)
    if not m:
        return
    raise exception.DBDeadlock(operational_error)
**** CubicPower OpenStack Study ****
def _wrap_db_error(f):
**** CubicPower OpenStack Study ****
    def _wrap(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except UnicodeEncodeError:
            raise exception.DBInvalidUnicodeParameter()
        # note(boris-42): We should catch unique constraint violation and
        # wrap it by our own DBDuplicateEntry exception. Unique constraint
        # violation is wrapped by IntegrityError.
        except sqla_exc.OperationalError as e:
            _raise_if_deadlock_error(e, get_engine().name)
            # NOTE(comstud): A lot of code is checking for OperationalError
            # so let's not wrap it for now.
            raise
        except sqla_exc.IntegrityError as e:
            # note(boris-42): SqlAlchemy doesn't unify errors from different
            # DBs so we must do this. Also in some tables (for example
            # instance_types) there are more than one unique constraint. This
            # means we should get names of columns, which values violate
            # unique constraint, from error message.
            _raise_if_duplicate_entry_error(e, get_engine().name)
            raise exception.DBError(e)
        except Exception as e:
            LOG.exception(_('DB exception wrapped.'))
            raise exception.DBError(e)
    _wrap.func_name = f.func_name
    return _wrap
def get_engine(sqlite_fk=False):
    """Return a SQLAlchemy engine."""
    global _ENGINE
    if _ENGINE is None:
        _ENGINE = create_engine(CONF.database.connection,
                                sqlite_fk=sqlite_fk)
    return _ENGINE
def _synchronous_switch_listener(dbapi_conn, connection_rec):
    """Switch sqlite connections to non-synchronous mode."""
    dbapi_conn.execute("PRAGMA synchronous = OFF")
def _add_regexp_listener(dbapi_con, con_record):
    """Add REGEXP function to sqlite connections."""
**** CubicPower OpenStack Study ****
def get_engine(sqlite_fk=False):
    """Return a SQLAlchemy engine."""
    global _ENGINE
    if _ENGINE is None:
        _ENGINE = create_engine(CONF.database.connection,
                                sqlite_fk=sqlite_fk)
    return _ENGINE
**** CubicPower OpenStack Study ****
def _synchronous_switch_listener(dbapi_conn, connection_rec):
    """Switch sqlite connections to non-synchronous mode."""
    dbapi_conn.execute("PRAGMA synchronous = OFF")
**** CubicPower OpenStack Study ****
def _add_regexp_listener(dbapi_con, con_record):
    """Add REGEXP function to sqlite connections."""
**** CubicPower OpenStack Study ****
    def regexp(expr, item):
        reg = re.compile(expr)
        return reg.search(six.text_type(item)) is not None
    dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record):
    """
    Ensure other greenthreads get a chance to execute by forcing a context
    switch. With common database backends (eg MySQLdb and sqlite), there is
    no implicit yield caused by network I/O since they are implemented by
    C libraries that eventlet cannot monkey patch.
    """
    greenthread.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
    """
    Ensures that MySQL connections checked out of the
    pool are alive.
    Borrowed from:
    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
    """
    try:
        dbapi_conn.cursor().execute('select 1')
    except dbapi_conn.OperationalError as ex:
        if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
            LOG.warn(_('Got mysql server has gone away: %s'), ex)
            raise sqla_exc.DisconnectionError("Database server went away")
        else:
            raise
def _is_db_connection_error(args):
    """Return True if error in connecting to db."""
    # NOTE(adam_g): This is currently MySQL specific and needs to be extended
    #               to support Postgres and others.
    conn_err_codes = ('2002', '2003', '2006')
    for err_code in conn_err_codes:
        if args.find(err_code) != -1:
            return True
    return False
def create_engine(sql_connection, sqlite_fk=False):
    """Return a new SQLAlchemy engine."""
    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
    engine_args = {
        "pool_recycle": CONF.database.idle_timeout,
        "echo": False,
        'convert_unicode': True,
    }
    # Map our SQL debug level to SQLAlchemy's options
    if CONF.database.connection_debug >= 100:
        engine_args['echo'] = 'debug'
    elif CONF.database.connection_debug >= 50:
        engine_args['echo'] = True
    if "sqlite" in connection_dict.drivername:
        if sqlite_fk:
            engine_args["listeners"] = [SqliteForeignKeysListener()]
        engine_args["poolclass"] = NullPool
        if CONF.database.connection == "sqlite://":
            engine_args["poolclass"] = StaticPool
            engine_args["connect_args"] = {'check_same_thread': False}
    else:
        engine_args['pool_size'] = CONF.database.max_pool_size
        if CONF.database.max_overflow is not None:
            engine_args['max_overflow'] = CONF.database.max_overflow
    engine = sqlalchemy.create_engine(sql_connection, **engine_args)
    sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
    if 'mysql' in connection_dict.drivername:
        sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
    elif 'sqlite' in connection_dict.drivername:
        if not CONF.sqlite_synchronous:
            sqlalchemy.event.listen(engine, 'connect',
                                    _synchronous_switch_listener)
        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
    if (CONF.database.connection_trace and
            engine.dialect.dbapi.__name__ == 'MySQLdb'):
        _patch_mysqldb_with_stacktrace_comments()
    try:
        engine.connect()
    except sqla_exc.OperationalError as e:
        if not _is_db_connection_error(e.args[0]):
            raise
        remaining = CONF.database.max_retries
        if remaining == -1:
            remaining = 'infinite'
        while True:
            msg = _('SQL connection failed. %s attempts left.')
            LOG.warn(msg % remaining)
            if remaining != 'infinite':
                remaining -= 1
            time.sleep(CONF.database.retry_interval)
            try:
                engine.connect()
                break
            except sqla_exc.OperationalError as e:
                if (remaining != 'infinite' and remaining == 0) or \
                        not _is_db_connection_error(e.args[0]):
                    raise
    return engine
**** CubicPower OpenStack Study ****
def _greenthread_yield(dbapi_con, con_record):
    """
    Ensure other greenthreads get a chance to execute by forcing a context
    switch. With common database backends (eg MySQLdb and sqlite), there is
    no implicit yield caused by network I/O since they are implemented by
    C libraries that eventlet cannot monkey patch.
    """
    greenthread.sleep(0)
**** CubicPower OpenStack Study ****
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
    """
    Ensures that MySQL connections checked out of the
    pool are alive.
    Borrowed from:
    http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
    """
    try:
        dbapi_conn.cursor().execute('select 1')
    except dbapi_conn.OperationalError as ex:
        if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
            LOG.warn(_('Got mysql server has gone away: %s'), ex)
            raise sqla_exc.DisconnectionError("Database server went away")
        else:
            raise
**** CubicPower OpenStack Study ****
def _is_db_connection_error(args):
    """Return True if error in connecting to db."""
    # NOTE(adam_g): This is currently MySQL specific and needs to be extended
    #               to support Postgres and others.
    conn_err_codes = ('2002', '2003', '2006')
    for err_code in conn_err_codes:
        if args.find(err_code) != -1:
            return True
    return False
**** CubicPower OpenStack Study ****
def create_engine(sql_connection, sqlite_fk=False):
    """Return a new SQLAlchemy engine."""
    connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
    engine_args = {
        "pool_recycle": CONF.database.idle_timeout,
        "echo": False,
        'convert_unicode': True,
    }
    # Map our SQL debug level to SQLAlchemy's options
    if CONF.database.connection_debug >= 100:
        engine_args['echo'] = 'debug'
    elif CONF.database.connection_debug >= 50:
        engine_args['echo'] = True
    if "sqlite" in connection_dict.drivername:
        if sqlite_fk:
            engine_args["listeners"] = [SqliteForeignKeysListener()]
        engine_args["poolclass"] = NullPool
        if CONF.database.connection == "sqlite://":
            engine_args["poolclass"] = StaticPool
            engine_args["connect_args"] = {'check_same_thread': False}
    else:
        engine_args['pool_size'] = CONF.database.max_pool_size
        if CONF.database.max_overflow is not None:
            engine_args['max_overflow'] = CONF.database.max_overflow
    engine = sqlalchemy.create_engine(sql_connection, **engine_args)
    sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
    if 'mysql' in connection_dict.drivername:
        sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
    elif 'sqlite' in connection_dict.drivername:
        if not CONF.sqlite_synchronous:
            sqlalchemy.event.listen(engine, 'connect',
                                    _synchronous_switch_listener)
        sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
    if (CONF.database.connection_trace and
            engine.dialect.dbapi.__name__ == 'MySQLdb'):
        _patch_mysqldb_with_stacktrace_comments()
    try:
        engine.connect()
    except sqla_exc.OperationalError as e:
        if not _is_db_connection_error(e.args[0]):
            raise
        remaining = CONF.database.max_retries
        if remaining == -1:
            remaining = 'infinite'
        while True:
            msg = _('SQL connection failed. %s attempts left.')
            LOG.warn(msg % remaining)
            if remaining != 'infinite':
                remaining -= 1
            time.sleep(CONF.database.retry_interval)
            try:
                engine.connect()
                break
            except sqla_exc.OperationalError as e:
                if (remaining != 'infinite' and remaining == 0) or \
                        not _is_db_connection_error(e.args[0]):
                    raise
    return engine
**** CubicPower OpenStack Study ****
class Query(sqlalchemy.orm.query.Query):
    """Subclass of sqlalchemy.query with soft_delete() method."""
    
**** CubicPower OpenStack Study ****
    def soft_delete(self, synchronize_session='evaluate'):
        return self.update({'deleted': literal_column('id'),
                            'updated_at': literal_column('updated_at'),
                            'deleted_at': timeutils.utcnow()},
                           synchronize_session=synchronize_session)
**** CubicPower OpenStack Study ****
class Session(sqlalchemy.orm.session.Session):
    """Custom Session class to avoid SqlAlchemy Session monkey patching."""
    @_wrap_db_error
    
**** CubicPower OpenStack Study ****
    def query(self, *args, **kwargs):
        return super(Session, self).query(*args, **kwargs)
    @_wrap_db_error
**** CubicPower OpenStack Study ****
    def flush(self, *args, **kwargs):
        return super(Session, self).flush(*args, **kwargs)
    @_wrap_db_error
**** CubicPower OpenStack Study ****
    def execute(self, *args, **kwargs):
        return super(Session, self).execute(*args, **kwargs)
def get_maker(engine, autocommit=True, expire_on_commit=False):
    """Return a SQLAlchemy sessionmaker using the given engine."""
    return sqlalchemy.orm.sessionmaker(bind=engine,
                                       class_=Session,
                                       autocommit=autocommit,
                                       expire_on_commit=expire_on_commit,
                                       query_cls=Query)
def _patch_mysqldb_with_stacktrace_comments():
    """Adds current stack trace as a comment in queries by patching
    MySQLdb.cursors.BaseCursor._do_query.
    """
    import MySQLdb.cursors
    import traceback
    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
**** CubicPower OpenStack Study ****
def get_maker(engine, autocommit=True, expire_on_commit=False):
    """Return a SQLAlchemy sessionmaker using the given engine."""
    return sqlalchemy.orm.sessionmaker(bind=engine,
                                       class_=Session,
                                       autocommit=autocommit,
                                       expire_on_commit=expire_on_commit,
                                       query_cls=Query)
**** CubicPower OpenStack Study ****
def _patch_mysqldb_with_stacktrace_comments():
    """Adds current stack trace as a comment in queries by patching
    MySQLdb.cursors.BaseCursor._do_query.
    """
    import MySQLdb.cursors
    import traceback
    old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
**** CubicPower OpenStack Study ****
    def _do_query(self, q):
        stack = ''
        for file, line, method, function in traceback.extract_stack():
            # exclude various common things from trace
            if file.endswith('session.py') and method == '_do_query':
                continue
            if file.endswith('api.py') and method == 'wrapper':
                continue
            if file.endswith('utils.py') and method == '_inner':
                continue
            if file.endswith('exception.py') and method == '_wrap':
                continue
            # db/api is just a wrapper around db/sqlalchemy/api
            if file.endswith('db/api.py'):
                continue
            # only trace inside cinder
            index = file.rfind('cinder')
            if index == -1:
                continue
            stack += "File:%s:%s Method:%s() Line:%s | " \
                     % (file[index:], line, method, function)
        # strip trailing " | " from stack
        if stack:
            stack = stack[:-3]
            qq = "%s /* %s */" % (q, stack)
        else:
            qq = q
        old_mysql_do_query(self, qq)
    setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)