_DEADLOCK_RE_DB = {
"mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
}
def _raise_if_deadlock_error(operational_error, engine_name):
"""
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
"""
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
m = re.match(operational_error.message)
if not m:
return
raise exception.DBDeadlock(operational_error)
def _wrap_db_error(f):
**** CubicPower OpenStack Study ****
def _raise_if_deadlock_error(operational_error, engine_name):
"""
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
"""
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
m = re.match(operational_error.message)
if not m:
return
raise exception.DBDeadlock(operational_error)
**** CubicPower OpenStack Study ****
def _wrap_db_error(f):
**** CubicPower OpenStack Study ****
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
# note(boris-42): We should catch unique constraint violation and
# wrap it by our own DBDuplicateEntry exception. Unique constraint
# violation is wrapped by IntegrityError.
except sqla_exc.OperationalError as e:
_raise_if_deadlock_error(e, get_engine().name)
# NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now.
raise
except sqla_exc.IntegrityError as e:
# note(boris-42): SqlAlchemy doesn't unify errors from different
# DBs so we must do this. Also in some tables (for example
# instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate
# unique constraint, from error message.
_raise_if_duplicate_entry_error(e, get_engine().name)
raise exception.DBError(e)
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e)
_wrap.func_name = f.func_name
return _wrap
def get_engine(sqlite_fk=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
_ENGINE = create_engine(CONF.database.connection,
sqlite_fk=sqlite_fk)
return _ENGINE
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
**** CubicPower OpenStack Study ****
def get_engine(sqlite_fk=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
_ENGINE = create_engine(CONF.database.connection,
sqlite_fk=sqlite_fk)
return _ENGINE
**** CubicPower OpenStack Study ****
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
**** CubicPower OpenStack Study ****
def _add_regexp_listener(dbapi_con, con_record):
"""Add REGEXP function to sqlite connections."""
**** CubicPower OpenStack Study ****
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(six.text_type(item)) is not None
dbapi_con.create_function('regexp', 2, regexp)
def _greenthread_yield(dbapi_con, con_record):
"""
Ensure other greenthreads get a chance to execute by forcing a context
switch. With common database backends (eg MySQLdb and sqlite), there is
no implicit yield caused by network I/O since they are implemented by
C libraries that eventlet cannot monkey patch.
"""
greenthread.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away")
else:
raise
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
conn_err_codes = ('2002', '2003', '2006')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
def create_engine(sql_connection, sqlite_fk=False):
"""Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": CONF.database.idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
if CONF.database.connection_debug >= 100:
engine_args['echo'] = 'debug'
elif CONF.database.connection_debug >= 50:
engine_args['echo'] = True
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
if CONF.database.connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
engine_args['pool_size'] = CONF.database.max_pool_size
if CONF.database.max_overflow is not None:
engine_args['max_overflow'] = CONF.database.max_overflow
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if (CONF.database.connection_trace and
engine.dialect.dbapi.__name__ == 'MySQLdb'):
_patch_mysqldb_with_stacktrace_comments()
try:
engine.connect()
except sqla_exc.OperationalError as e:
if not _is_db_connection_error(e.args[0]):
raise
remaining = CONF.database.max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(CONF.database.retry_interval)
try:
engine.connect()
break
except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \
not _is_db_connection_error(e.args[0]):
raise
return engine
**** CubicPower OpenStack Study ****
def _greenthread_yield(dbapi_con, con_record):
"""
Ensure other greenthreads get a chance to execute by forcing a context
switch. With common database backends (eg MySQLdb and sqlite), there is
no implicit yield caused by network I/O since they are implemented by
C libraries that eventlet cannot monkey patch.
"""
greenthread.sleep(0)
**** CubicPower OpenStack Study ****
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away")
else:
raise
**** CubicPower OpenStack Study ****
def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
conn_err_codes = ('2002', '2003', '2006')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
**** CubicPower OpenStack Study ****
def create_engine(sql_connection, sqlite_fk=False):
"""Return a new SQLAlchemy engine."""
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
"pool_recycle": CONF.database.idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
if CONF.database.connection_debug >= 100:
engine_args['echo'] = 'debug'
elif CONF.database.connection_debug >= 50:
engine_args['echo'] = True
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
if CONF.database.connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
engine_args['pool_size'] = CONF.database.max_pool_size
if CONF.database.max_overflow is not None:
engine_args['max_overflow'] = CONF.database.max_overflow
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
if (CONF.database.connection_trace and
engine.dialect.dbapi.__name__ == 'MySQLdb'):
_patch_mysqldb_with_stacktrace_comments()
try:
engine.connect()
except sqla_exc.OperationalError as e:
if not _is_db_connection_error(e.args[0]):
raise
remaining = CONF.database.max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(CONF.database.retry_interval)
try:
engine.connect()
break
except sqla_exc.OperationalError as e:
if (remaining != 'infinite' and remaining == 0) or \
not _is_db_connection_error(e.args[0]):
raise
return engine
**** CubicPower OpenStack Study ****
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
**** CubicPower OpenStack Study ****
def soft_delete(self, synchronize_session='evaluate'):
return self.update({'deleted': literal_column('id'),
'updated_at': literal_column('updated_at'),
'deleted_at': timeutils.utcnow()},
synchronize_session=synchronize_session)
**** CubicPower OpenStack Study ****
class Session(sqlalchemy.orm.session.Session):
"""Custom Session class to avoid SqlAlchemy Session monkey patching."""
@_wrap_db_error
**** CubicPower OpenStack Study ****
def query(self, *args, **kwargs):
return super(Session, self).query(*args, **kwargs)
@_wrap_db_error
**** CubicPower OpenStack Study ****
def flush(self, *args, **kwargs):
return super(Session, self).flush(*args, **kwargs)
@_wrap_db_error
**** CubicPower OpenStack Study ****
def execute(self, *args, **kwargs):
return super(Session, self).execute(*args, **kwargs)
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)
def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries by patching
MySQLdb.cursors.BaseCursor._do_query.
"""
import MySQLdb.cursors
import traceback
old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
**** CubicPower OpenStack Study ****
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
class_=Session,
autocommit=autocommit,
expire_on_commit=expire_on_commit,
query_cls=Query)
**** CubicPower OpenStack Study ****
def _patch_mysqldb_with_stacktrace_comments():
"""Adds current stack trace as a comment in queries by patching
MySQLdb.cursors.BaseCursor._do_query.
"""
import MySQLdb.cursors
import traceback
old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
**** CubicPower OpenStack Study ****
def _do_query(self, q):
stack = ''
for file, line, method, function in traceback.extract_stack():
# exclude various common things from trace
if file.endswith('session.py') and method == '_do_query':
continue
if file.endswith('api.py') and method == 'wrapper':
continue
if file.endswith('utils.py') and method == '_inner':
continue
if file.endswith('exception.py') and method == '_wrap':
continue
# db/api is just a wrapper around db/sqlalchemy/api
if file.endswith('db/api.py'):
continue
# only trace inside cinder
index = file.rfind('cinder')
if index == -1:
continue
stack += "File:%s:%s Method:%s() Line:%s | " \
% (file[index:], line, method, function)
# strip trailing " | " from stack
if stack:
stack = stack[:-3]
qq = "%s /* %s */" % (q, stack)
else:
qq = q
old_mysql_do_query(self, qq)
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)