**** CubicPower OpenStack Study ****
def stub_volume(id, **kwargs):
volume = {
'id': id,
'user_id': 'fakeuser',
'project_id': 'fakeproject',
'host': 'fakehost',
'size': 1,
'availability_zone': 'fakeaz',
'instance_uuid': 'fakeuuid',
'attached_host': None,
'mountpoint': '/',
'status': 'fakestatus',
'migration_status': None,
'attach_status': 'attached',
'bootable': 'false',
'name': 'vol name',
'display_name': 'displayname',
'display_description': 'displaydesc',
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'snapshot_id': None,
'source_volid': None,
'volume_type_id': '3e196c20-3c06-11e2-81c1-0800200c9a66',
'encryption_key_id': None,
'volume_admin_metadata': [{'key': 'attached_mode', 'value': 'rw'},
{'key': 'readonly', 'value': 'False'}],
'bootable': False,
'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'volume_type': {'name': 'vol_type_name'}}
volume.update(kwargs)
if kwargs.get('volume_glance_metadata', None):
volume['bootable'] = True
if kwargs.get('attach_status') == 'detached':
del volume['volume_admin_metadata'][0]
return volume
**** CubicPower OpenStack Study ****
def stub_volume_create(self, context, size, name, description, snapshot,
**param):
vol = stub_volume('1')
vol['size'] = size
vol['display_name'] = name
vol['display_description'] = description
vol['source_volid'] = None
vol['bootable'] = False
try:
vol['snapshot_id'] = snapshot['id']
except (KeyError, TypeError):
vol['snapshot_id'] = None
vol['availability_zone'] = param.get('availability_zone', 'fakeaz')
return vol
**** CubicPower OpenStack Study ****
def stub_volume_create_from_image(self, context, size, name, description,
snapshot, volume_type, metadata,
availability_zone):
vol = stub_volume('1')
vol['status'] = 'creating'
vol['size'] = size
vol['display_name'] = name
vol['display_description'] = description
vol['availability_zone'] = 'cinder'
vol['bootable'] = False
return vol
**** CubicPower OpenStack Study ****
def stub_volume_update(self, context, *args, **param):
pass
**** CubicPower OpenStack Study ****
def stub_volume_delete(self, context, *args, **param):
pass
**** CubicPower OpenStack Study ****
def stub_volume_get(self, context, volume_id):
return stub_volume(volume_id)
**** CubicPower OpenStack Study ****
def stub_volume_get_notfound(self, context, volume_id):
raise exc.NotFound
**** CubicPower OpenStack Study ****
def stub_volume_get_db(context, volume_id):
return stub_volume(volume_id)
**** CubicPower OpenStack Study ****
def stub_volume_get_all(context, search_opts=None, marker=None, limit=None,
sort_key='created_at', sort_dir='desc', filters=None):
return [stub_volume(100, project_id='fake'),
stub_volume(101, project_id='superfake'),
stub_volume(102, project_id='superduperfake')]
**** CubicPower OpenStack Study ****
def stub_volume_get_all_by_project(self, context, marker, limit, sort_key,
sort_dir, filters={}):
return [stub_volume_get(self, context, '1')]
**** CubicPower OpenStack Study ****
def stub_snapshot(id, **kwargs):
snapshot = {'id': id,
'volume_id': 12,
'status': 'available',
'volume_size': 100,
'created_at': None,
'display_name': 'Default name',
'display_description': 'Default description',
'project_id': 'fake'}
snapshot.update(kwargs)
return snapshot
**** CubicPower OpenStack Study ****
def stub_snapshot_get_all(self):
return [stub_snapshot(100, project_id='fake'),
stub_snapshot(101, project_id='superfake'),
stub_snapshot(102, project_id='superduperfake')]
**** CubicPower OpenStack Study ****
def stub_snapshot_get_all_by_project(self, context):
return [stub_snapshot(1)]
**** CubicPower OpenStack Study ****
def stub_snapshot_update(self, context, *args, **param):
pass
**** CubicPower OpenStack Study ****
def stub_service_get_all_by_topic(context, topic):
return [{'availability_zone': "zone1:host1", "disabled": 0}]