"' # Tested through existing tests (e.g. calls to find_network_with_bridge)
and_clauses = query.split(" and ")
if len(and_clauses) > 1:
matches = True
for clause in and_clauses:
matches = matches and _query_matches(record, clause)
return matches
or_clauses = query.split(" or ")
if len(or_clauses) > 1:
matches = False
for clause in or_clauses:
matches = matches or _query_matches(record, clause)
return matches
if query[:4] == 'not ':
return not _query_matches(record, query[4:])
# Now it must be a single field - bad queries never match
if query[:5] != 'field':
return False
(field, value) = query[6:].split('=', 1)
# Some fields (e.g. name_label, memory_overhead) have double
# underscores in the DB, but only single underscores when querying
field = field.replace("__", "_").strip(" \"'")
value = value.strip(" \"'")
# Strings should be directly compared
if isinstance(record[field], str):
return record[field] == value
# But for all other value-checks, convert to a string first
# (Notably used for booleans - which can be lower or camel
# case and are interpreted/sanitised by XAPI)
return str(record[field]).lower() == value.lower()
**** CubicPower OpenStack Study ****
def get_all_records_where(table_name, query):
matching_records = {}
table = _db_content[table_name]
for record in table:
if _query_matches(table[record], query):
matching_records[record] = table[record]
return matching_records
**** CubicPower OpenStack Study ****
def get_record(table, ref):
if ref in _db_content[table]:
return _db_content[table].get(ref)
else:
raise Failure(['HANDLE_INVALID', table, ref])
**** CubicPower OpenStack Study ****
def check_for_session_leaks():
if len(_db_content['session']) > 0:
raise exception.NovaException('Sessions have leaked: %s' %
_db_content['session'])
**** CubicPower OpenStack Study ****
def as_value(s):
"""Helper function for simulating XenAPI plugin responses. It
escapes and wraps the given argument.
"""
return '%s' % saxutils.escape(s)
**** CubicPower OpenStack Study ****
def as_json(*args, **kwargs):
"""Helper function for simulating XenAPI plugin responses for those
that are returning JSON. If this function is given plain arguments,
then these are rendered as a JSON list. If it's given keyword
arguments then these are rendered as a JSON dict.
"""
arg = args or kwargs
return jsonutils.dumps(arg)
**** CubicPower OpenStack Study ****
class Failure(Exception):
**** CubicPower OpenStack Study ****
def __init__(self, details):
self.details = details
**** CubicPower OpenStack Study ****
def __str__(self):
try:
return str(self.details)
except Exception:
return "XenAPI Fake Failure: %s" % str(self.details)
**** CubicPower OpenStack Study ****
def _details_map(self):
return dict([(str(i), self.details[i])
for i in range(len(self.details))])
**** CubicPower OpenStack Study ****
class SessionBase(object):
"""Base class for Fake Sessions."""
**** CubicPower OpenStack Study ****
def __init__(self, uri):
self._session = None
xenapi_session.apply_session_helpers(self)
**** CubicPower OpenStack Study ****
def pool_get_default_SR(self, _1, pool_ref):
return _db_content['pool'].values()[0]['default-SR']
**** CubicPower OpenStack Study ****
def VBD_insert(self, _1, vbd_ref, vdi_ref):
vbd_rec = get_record('VBD', vbd_ref)
get_record('VDI', vdi_ref)
vbd_rec['empty'] = False
vbd_rec['VDI'] = vdi_ref
**** CubicPower OpenStack Study ****
def VBD_plug(self, _1, ref):
rec = get_record('VBD', ref)
if rec['currently_attached']:
raise Failure(['DEVICE_ALREADY_ATTACHED', ref])
rec['currently_attached'] = True
rec['device'] = rec['userdevice']
**** CubicPower OpenStack Study ****
def VBD_unplug(self, _1, ref):
rec = get_record('VBD', ref)
if not rec['currently_attached']:
raise Failure(['DEVICE_ALREADY_DETACHED', ref])
rec['currently_attached'] = False
rec['device'] = ''
**** CubicPower OpenStack Study ****
def VBD_add_to_other_config(self, _1, vbd_ref, key, value):
db_ref = _db_content['VBD'][vbd_ref]
if 'other_config' not in db_ref:
db_ref['other_config'] = {}
if key in db_ref['other_config']:
raise Failure(['MAP_DUPLICATE_KEY', 'VBD', 'other_config',
vbd_ref, key])
db_ref['other_config'][key] = value
**** CubicPower OpenStack Study ****
def VBD_get_other_config(self, _1, vbd_ref):
db_ref = _db_content['VBD'][vbd_ref]
if 'other_config' not in db_ref:
return {}
return db_ref['other_config']
**** CubicPower OpenStack Study ****
def PBD_create(self, _1, pbd_rec):
pbd_ref = _create_object('PBD', pbd_rec)
_db_content['PBD'][pbd_ref]['currently_attached'] = False
return pbd_ref
**** CubicPower OpenStack Study ****
def PBD_plug(self, _1, pbd_ref):
rec = get_record('PBD', pbd_ref)
if rec['currently_attached']:
raise Failure(['DEVICE_ALREADY_ATTACHED', rec])
rec['currently_attached'] = True
sr_ref = rec['SR']
_db_content['SR'][sr_ref]['PBDs'] = [pbd_ref]
**** CubicPower OpenStack Study ****
def PBD_unplug(self, _1, pbd_ref):
rec = get_record('PBD', pbd_ref)
if not rec['currently_attached']:
raise Failure(['DEVICE_ALREADY_DETACHED', rec])
rec['currently_attached'] = False
sr_ref = rec['SR']
_db_content['SR'][sr_ref]['PBDs'].remove(pbd_ref)
**** CubicPower OpenStack Study ****
def SR_introduce(self, _1, sr_uuid, label, desc, type, content_type,
shared, sm_config):
ref = None
rec = None
for ref, rec in _db_content['SR'].iteritems():
if rec.get('uuid') == sr_uuid:
# make forgotten = 0 and return ref
_db_content['SR'][ref]['forgotten'] = 0
return ref
# SR not found in db, so we create one
params = {'sr_uuid': sr_uuid,
'label': label,
'desc': desc,
'type': type,
'content_type': content_type,
'shared': shared,
'sm_config': sm_config}
sr_ref = _create_object('SR', params)
_db_content['SR'][sr_ref]['uuid'] = sr_uuid
_db_content['SR'][sr_ref]['forgotten'] = 0
vdi_per_lun = False
if type == 'iscsi':
# Just to be clear
vdi_per_lun = True
if vdi_per_lun:
# we need to create a vdi because this introduce
# is likely meant for a single vdi
vdi_ref = create_vdi('', sr_ref)
_db_content['SR'][sr_ref]['VDIs'] = [vdi_ref]
_db_content['VDI'][vdi_ref]['SR'] = sr_ref
return sr_ref
**** CubicPower OpenStack Study ****
def SR_forget(self, _1, sr_ref):
_db_content['SR'][sr_ref]['forgotten'] = 1
**** CubicPower OpenStack Study ****
def SR_scan(self, _1, sr_ref):
return
**** CubicPower OpenStack Study ****
def VM_get_xenstore_data(self, _1, vm_ref):
return _db_content['VM'][vm_ref].get('xenstore_data', {})
**** CubicPower OpenStack Study ****
def VM_remove_from_xenstore_data(self, _1, vm_ref, key):
db_ref = _db_content['VM'][vm_ref]
if 'xenstore_data' not in db_ref:
return
if key in db_ref['xenstore_data']:
del db_ref['xenstore_data'][key]
**** CubicPower OpenStack Study ****
def VM_add_to_xenstore_data(self, _1, vm_ref, key, value):
db_ref = _db_content['VM'][vm_ref]
if 'xenstore_data' not in db_ref:
db_ref['xenstore_data'] = {}
db_ref['xenstore_data'][key] = value
**** CubicPower OpenStack Study ****
def VM_pool_migrate(self, _1, vm_ref, host_ref, options):
pass
**** CubicPower OpenStack Study ****
def VDI_remove_from_other_config(self, _1, vdi_ref, key):
db_ref = _db_content['VDI'][vdi_ref]
if 'other_config' not in db_ref:
return
if key in db_ref['other_config']:
del db_ref['other_config'][key]
**** CubicPower OpenStack Study ****
def VDI_add_to_other_config(self, _1, vdi_ref, key, value):
db_ref = _db_content['VDI'][vdi_ref]
if 'other_config' not in db_ref:
db_ref['other_config'] = {}
if key in db_ref['other_config']:
raise Failure(['MAP_DUPLICATE_KEY', 'VDI', 'other_config',
vdi_ref, key])
db_ref['other_config'][key] = value
**** CubicPower OpenStack Study ****
def VDI_copy(self, _1, vdi_to_copy_ref, sr_ref):
db_ref = _db_content['VDI'][vdi_to_copy_ref]
name_label = db_ref['name_label']
read_only = db_ref['read_only']
sharable = db_ref['sharable']
other_config = db_ref['other_config'].copy()
return create_vdi(name_label, sr_ref, sharable=sharable,
read_only=read_only, other_config=other_config)
**** CubicPower OpenStack Study ****
def VDI_clone(self, _1, vdi_to_clone_ref):
db_ref = _db_content['VDI'][vdi_to_clone_ref]
sr_ref = db_ref['SR']
return self.VDI_copy(_1, vdi_to_clone_ref, sr_ref)
**** CubicPower OpenStack Study ****
def host_compute_free_memory(self, _1, ref):
#Always return 12GB available
return 12 * units.Gi
**** CubicPower OpenStack Study ****
def _plugin_agent_version(self, method, args):
return as_json(returncode='0', message='1.0\\r\\n')
**** CubicPower OpenStack Study ****
def _plugin_agent_key_init(self, method, args):
return as_json(returncode='D0', message='1')
**** CubicPower OpenStack Study ****
def _plugin_agent_password(self, method, args):
return as_json(returncode='0', message='success')
**** CubicPower OpenStack Study ****
def _plugin_agent_inject_file(self, method, args):
return as_json(returncode='0', message='success')
**** CubicPower OpenStack Study ****
def _plugin_agent_resetnetwork(self, method, args):
return as_json(returncode='0', message='success')
**** CubicPower OpenStack Study ****
def _plugin_agent_agentupdate(self, method, args):
url = args["url"]
md5 = args["md5sum"]
message = "success with %(url)s and hash:%(md5)s" % dict(url=url,
md5=md5)
return as_json(returncode='0', message=message)
**** CubicPower OpenStack Study ****
def _plugin_noop(self, method, args):
return ''
**** CubicPower OpenStack Study ****
def _plugin_pickle_noop(self, method, args):
return pickle.dumps(None)
**** CubicPower OpenStack Study ****
def _plugin_migration_transfer_vhd(self, method, args):
kwargs = pickle.loads(args['params'])['kwargs']
vdi_ref = self.xenapi_request('VDI.get_by_uuid',
(kwargs['vdi_uuid'], ))
assert vdi_ref
return pickle.dumps(None)
_plugin_glance_upload_vhd = _plugin_pickle_noop
_plugin_kernel_copy_vdi = _plugin_noop
_plugin_kernel_create_kernel_ramdisk = _plugin_noop
_plugin_kernel_remove_kernel_ramdisk = _plugin_noop
_plugin_migration_move_vhds_into_sr = _plugin_noop
**** CubicPower OpenStack Study ****
def _plugin_xenhost_host_data(self, method, args):
return jsonutils.dumps({'host_memory': {'total': 10,
'overhead': 20,
'free': 30,
'free-computed': 40},
'host_hostname': 'fake-xenhost',
'host_cpu_info': {'cpu_count': 50},
})
**** CubicPower OpenStack Study ****
def _plugin_poweraction(self, method, args):
return jsonutils.dumps({"power_action": method[5:]})
_plugin_xenhost_host_reboot = _plugin_poweraction
_plugin_xenhost_host_startup = _plugin_poweraction
_plugin_xenhost_host_shutdown = _plugin_poweraction
**** CubicPower OpenStack Study ****
def _plugin_xenhost_set_host_enabled(self, method, args):
enabled = 'enabled' if args.get('enabled') == 'true' else 'disabled'
return jsonutils.dumps({"status": enabled})
**** CubicPower OpenStack Study ****
def _plugin_xenhost_host_uptime(self, method, args):
return jsonutils.dumps({"uptime": "fake uptime"})
**** CubicPower OpenStack Study ****
def _plugin_xenhost_get_pci_device_details(self, method, args):
"""Simulate the ouput of three pci devices.
Both of those devices are available for pci passtrough but
only one will match with the pci whitelist used in the
method test_pci_passthrough_devices_*().
Return a single list.
"""
# Driver is not pciback
dev_bad1 = ["Slot:\t86:10.0", "Class:\t0604", "Vendor:\t10b5",
"Device:\t8747", "Rev:\tba", "Driver:\tpcieport", "\n"]
# Driver is pciback but vendor and device are bad
dev_bad2 = ["Slot:\t88:00.0", "Class:\t0300", "Vendor:\t0bad",
"Device:\tcafe", "SVendor:\t10de", "SDevice:\t100d",
"Rev:\ta1", "Driver:\tpciback", "\n"]
# Driver is pciback and vendor, device are used for matching
dev_good = ["Slot:\t87:00.0", "Class:\t0300", "Vendor:\t10de",
"Device:\t11bf", "SVendor:\t10de", "SDevice:\t100d",
"Rev:\ta1", "Driver:\tpciback", "\n"]
lspci_output = "\n".join(dev_bad1 + dev_bad2 + dev_good)
return pickle.dumps(lspci_output)
**** CubicPower OpenStack Study ****
def _plugin_xenhost_get_pci_type(self, method, args):
return pickle.dumps("type-PCI")
**** CubicPower OpenStack Study ****
def _plugin_console_get_console_log(self, method, args):
dom_id = args["dom_id"]
if dom_id == 0:
raise Failure('Guest does not have a console')
return base64.b64encode(zlib.compress("dom_id: %s" % dom_id))
**** CubicPower OpenStack Study ****
def _plugin_nova_plugin_version_get_version(self, method, args):
return pickle.dumps("1.2")
**** CubicPower OpenStack Study ****
def _plugin_xenhost_query_gc(self, method, args):
return pickle.dumps("False")
**** CubicPower OpenStack Study ****
def host_call_plugin(self, _1, _2, plugin, method, args):
func = getattr(self, '_plugin_%s_%s' % (plugin, method), None)
if not func:
raise Exception('No simulation in host_call_plugin for %s,%s' %
(plugin, method))
return func(method, args)
**** CubicPower OpenStack Study ****
def VDI_get_virtual_size(self, *args):
return 1 * units.Gi
**** CubicPower OpenStack Study ****
def VDI_resize_online(self, *args):
return 'derp'
VDI_resize = VDI_resize_online
**** CubicPower OpenStack Study ****
def _VM_reboot(self, session, vm_ref):
db_ref = _db_content['VM'][vm_ref]
if db_ref['power_state'] != 'Running':
raise Failure(['VM_BAD_POWER_STATE',
'fake-opaque-ref', db_ref['power_state'].lower(), 'halted'])
db_ref['power_state'] = 'Running'
db_ref['domid'] = random.randrange(1, 1 << 16)
**** CubicPower OpenStack Study ****
def VM_clean_reboot(self, session, vm_ref):
return self._VM_reboot(session, vm_ref)
**** CubicPower OpenStack Study ****
def VM_hard_reboot(self, session, vm_ref):
return self._VM_reboot(session, vm_ref)
**** CubicPower OpenStack Study ****
def VM_hard_shutdown(self, session, vm_ref):
db_ref = _db_content['VM'][vm_ref]
db_ref['power_state'] = 'Halted'
db_ref['domid'] = -1
VM_clean_shutdown = VM_hard_shutdown
**** CubicPower OpenStack Study ****
def VM_suspend(self, session, vm_ref):
db_ref = _db_content['VM'][vm_ref]
db_ref['power_state'] = 'Suspended'
**** CubicPower OpenStack Study ****
def VM_pause(self, session, vm_ref):
db_ref = _db_content['VM'][vm_ref]
db_ref['power_state'] = 'Paused'
**** CubicPower OpenStack Study ****
def pool_eject(self, session, host_ref):
pass
**** CubicPower OpenStack Study ****
def pool_join(self, session, hostname, username, password):
pass
**** CubicPower OpenStack Study ****
def pool_set_name_label(self, session, pool_ref, name):
pass
**** CubicPower OpenStack Study ****
def host_migrate_receive(self, session, destref, nwref, options):
return "fake_migrate_data"
**** CubicPower OpenStack Study ****
def VM_assert_can_migrate(self, session, vmref, migrate_data, live,
vdi_map, vif_map, options):
pass
**** CubicPower OpenStack Study ****
def VM_migrate_send(self, session, mref, migrate_data, live, vdi_map,
vif_map, options):
pass
**** CubicPower OpenStack Study ****
def VM_remove_from_blocked_operations(self, session, vm_ref, key):
# operation is idempotent, XenServer doesn't care if the key exists
_db_content['VM'][vm_ref]['blocked_operations'].pop(key, None)
**** CubicPower OpenStack Study ****
def xenapi_request(self, methodname, params):
if methodname.startswith('login'):
self._login(methodname, params)
return None
elif methodname == 'logout' or methodname == 'session.logout':
self._logout()
return None
else:
full_params = (self._session,) + params
meth = getattr(self, methodname, None)
if meth is None:
LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError(
_('xenapi.fake does not have an implementation for %s') %
methodname)
return meth(*full_params)
**** CubicPower OpenStack Study ****
def _login(self, method, params):
self._session = str(uuid.uuid4())
_session_info = {'uuid': str(uuid.uuid4()),
'this_host': _db_content['host'].keys()[0]}
_db_content['session'][self._session] = _session_info
**** CubicPower OpenStack Study ****
def _logout(self):
s = self._session
self._session = None
if s not in _db_content['session']:
raise exception.NovaException(
"Logging out a session that is invalid or already logged "
"out: %s" % s)
del _db_content['session'][s]
**** CubicPower OpenStack Study ****
def __getattr__(self, name):
if name == 'handle':
return self._session
elif name == 'xenapi':
return _Dispatcher(self.xenapi_request, None)
elif name.startswith('login') or name.startswith('slave_local'):
return lambda *params: self._login(name, params)
elif name.startswith('Async'):
return lambda *params: self._async(name, params)
elif '.' in name:
impl = getattr(self, name.replace('.', '_'))
if impl is not None:
def callit(*params):
LOG.debug(_('Calling %(name)s %(impl)s'),
{'name': name, 'impl': impl})
self._check_session(params)
return impl(*params)
return callit
if self._is_gettersetter(name, True):
LOG.debug(_('Calling getter %s'), name)
return lambda *params: self._getter(name, params)
elif self._is_gettersetter(name, False):
LOG.debug(_('Calling setter %s'), name)
return lambda *params: self._setter(name, params)
elif self._is_create(name):
return lambda *params: self._create(name, params)
elif self._is_destroy(name):
return lambda *params: self._destroy(name, params)
elif name == 'XenAPI':
return FakeXenAPI()
else:
return None
**** CubicPower OpenStack Study ****
def _is_gettersetter(self, name, getter):
bits = name.split('.')
return (len(bits) == 2 and
bits[0] in _CLASSES and
bits[1].startswith(getter and 'get_' or 'set_'))
**** CubicPower OpenStack Study ****
def _is_create(self, name):
return self._is_method(name, 'create')
**** CubicPower OpenStack Study ****
def _is_destroy(self, name):
return self._is_method(name, 'destroy')
**** CubicPower OpenStack Study ****
def _is_method(self, name, meth):
bits = name.split('.')
return (len(bits) == 2 and
bits[0] in _CLASSES and
bits[1] == meth)
**** CubicPower OpenStack Study ****
def _getter(self, name, params):
self._check_session(params)
(cls, func) = name.split('.')
if func == 'get_all':
self._check_arg_count(params, 1)
return get_all(cls)
if func == 'get_all_records':
self._check_arg_count(params, 1)
return get_all_records(cls)
if func == 'get_all_records_where':
self._check_arg_count(params, 2)
return get_all_records_where(cls, params[1])
if func == 'get_record':
self._check_arg_count(params, 2)
return get_record(cls, params[1])
if func in ('get_by_name_label', 'get_by_uuid'):
self._check_arg_count(params, 2)
return_singleton = (func == 'get_by_uuid')
return self._get_by_field(
_db_content[cls], func[len('get_by_'):], params[1],
return_singleton=return_singleton)
if len(params) == 2:
field = func[len('get_'):]
ref = params[1]
if (ref in _db_content[cls]):
if (field in _db_content[cls][ref]):
return _db_content[cls][ref][field]
else:
raise Failure(['HANDLE_INVALID', cls, ref])
LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError(
_('xenapi.fake does not have an implementation for %s or it has '
'been called with the wrong number of arguments') % name)
**** CubicPower OpenStack Study ****
def _setter(self, name, params):
self._check_session(params)
(cls, func) = name.split('.')
if len(params) == 3:
field = func[len('set_'):]
ref = params[1]
val = params[2]
if (ref in _db_content[cls] and
field in _db_content[cls][ref]):
_db_content[cls][ref][field] = val
return
LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError(
'xenapi.fake does not have an implementation for %s or it has '
'been called with the wrong number of arguments or the database '
'is missing that field' % name)
**** CubicPower OpenStack Study ****
def _create(self, name, params):
self._check_session(params)
is_sr_create = name == 'SR.create'
is_vlan_create = name == 'VLAN.create'
# Storage Repositories have a different API
expected = is_sr_create and 10 or is_vlan_create and 4 or 2
self._check_arg_count(params, expected)
(cls, _) = name.split('.')
ref = (is_sr_create and
_create_sr(cls, params) or
is_vlan_create and
_create_vlan(params[1], params[2], params[3]) or
_create_object(cls, params[1]))
# Call hook to provide any fixups needed (ex. creating backrefs)
after_hook = 'after_%s_create' % cls
if after_hook in globals():
globals()[after_hook](ref, params[1])
obj = get_record(cls, ref)
# Add RO fields
if cls == 'VM':
obj['power_state'] = 'Halted'
return ref
**** CubicPower OpenStack Study ****
def _destroy(self, name, params):
self._check_session(params)
self._check_arg_count(params, 2)
table = name.split('.')[0]
ref = params[1]
if ref not in _db_content[table]:
raise Failure(['HANDLE_INVALID', table, ref])
# Call destroy function (if exists)
destroy_func = globals().get('destroy_%s' % table.lower())
if destroy_func:
destroy_func(ref)
else:
del _db_content[table][ref]
**** CubicPower OpenStack Study ****
def _async(self, name, params):
task_ref = create_task(name)
task = _db_content['task'][task_ref]
func = name[len('Async.'):]
try:
result = self.xenapi_request(func, params[1:])
if result:
result = as_value(result)
task['result'] = result
task['status'] = 'success'
except Failure as exc:
task['error_info'] = exc.details
task['status'] = 'failed'
task['finished'] = timeutils.utcnow()
return task_ref
**** CubicPower OpenStack Study ****
def _check_session(self, params):
if (self._session is None or
self._session not in _db_content['session']):
raise Failure(['HANDLE_INVALID', 'session', self._session])
if len(params) == 0 or params[0] != self._session:
LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError('Call to XenAPI without using .xenapi')
**** CubicPower OpenStack Study ****
def _check_arg_count(self, params, expected):
actual = len(params)
if actual != expected:
raise Failure(['MESSAGE_PARAMETER_COUNT_MISMATCH',
expected, actual])
**** CubicPower OpenStack Study ****
def _get_by_field(self, recs, k, v, return_singleton):
result = []
for ref, rec in recs.iteritems():
if rec.get(k) == v:
result.append(ref)
if return_singleton:
try:
return result[0]
except IndexError:
raise Failure(['UUID_INVALID', v, result, recs, k])
return result
**** CubicPower OpenStack Study ****
class FakeXenAPI(object):
**** CubicPower OpenStack Study ****
def __init__(self):
self.Failure = Failure
# Based upon _Method from xmlrpclib.
**** CubicPower OpenStack Study ****
class _Dispatcher:
**** CubicPower OpenStack Study ****
def __init__(self, send, name):
self.__send = send
self.__name = name
**** CubicPower OpenStack Study ****
def __repr__(self):
if self.__name:
return '' % self.__name else:
return ''
**** CubicPower OpenStack Study ****
def __getattr__(self, name):
if self.__name is None:
return _Dispatcher(self.__send, name)
else:
return _Dispatcher(self.__send, "%s.%s" % (self.__name, name))
**** CubicPower OpenStack Study ****
def __call__(self, *args):
return self.__send(self.__name, args)