**** CubicPower OpenStack Study ****
def get_engine(uri):
"""Engine creation
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
**** CubicPower OpenStack Study ****
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
**** CubicPower OpenStack Study ****
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
**** CubicPower OpenStack Study ****
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
if driver == 'sqlite':
try:
os.remove(auth['database'])
except OSError:
pass
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
**** CubicPower OpenStack Study ****
def main():
"""Controller to handle commands
::create: Create test user and database with random names.
::drop: Drop user and database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test '
'databases and users.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
type=int,
help='Number of databases to create.')
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases and users.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
args = parser.parse_args()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
if __name__ == "__main__":
main()