**** CubicPower OpenStack Study ****
# Copyright 2014 IBM Corp.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for cinder.db.api."""
import datetime
from oslo.config import cfg
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import uuidutils
from cinder.quota import ReservableResource
from cinder import test
CONF = cfg.CONF
**** CubicPower OpenStack Study ****
def _quota_reserve(context, project_id):
"""Create sample Quota, QuotaUsage and Reservation objects.
There is no method db.quota_usage_create(), so we have to use
db.quota_reserve() for creating QuotaUsage objects.
Returns reservations uuids.
"""
def get_sync(resource, usage):
def sync(elevated, project_id, session):
return {resource: usage}
return sync
quotas = {}
resources = {}
deltas = {}
for i, resource in enumerate(('volumes', 'gigabytes')):
quotas[resource] = db.quota_create(context, project_id,
resource, i + 1)
resources[resource] = ReservableResource(resource,
'_sync_%s' % resource)
deltas[resource] = i + 1
return db.quota_reserve(
context, resources, quotas, deltas,
datetime.datetime.utcnow(), datetime.datetime.utcnow(),
datetime.timedelta(days=1), project_id
)
**** CubicPower OpenStack Study ****
def get_sync(resource, usage):
def sync(elevated, project_id, session):
return {resource: usage}
return sync
quotas = {}
resources = {}
deltas = {}
for i, resource in enumerate(('volumes', 'gigabytes')):
quotas[resource] = db.quota_create(context, project_id,
resource, i + 1)
resources[resource] = ReservableResource(resource,
'_sync_%s' % resource)
deltas[resource] = i + 1
return db.quota_reserve(
context, resources, quotas, deltas,
datetime.datetime.utcnow(), datetime.datetime.utcnow(),
datetime.timedelta(days=1), project_id
)
**** CubicPower OpenStack Study ****
class ModelsObjectComparatorMixin(object):
**** CubicPower OpenStack Study ****
def _dict_from_object(self, obj, ignored_keys):
if ignored_keys is None:
ignored_keys = []
return dict([(k, v) for k, v in obj.iteritems()
if k not in ignored_keys])
**** CubicPower OpenStack Study ****
def _assertEqualObjects(self, obj1, obj2, ignored_keys=None):
obj1 = self._dict_from_object(obj1, ignored_keys)
obj2 = self._dict_from_object(obj2, ignored_keys)
self.assertEqual(
len(obj1), len(obj2),
"Keys mismatch: %s" % str(set(obj1.keys()) ^ set(obj2.keys())))
for key, value in obj1.iteritems():
self.assertEqual(value, obj2[key])
**** CubicPower OpenStack Study ****
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
sort_key = lambda d: [d[k] for k in sorted(d)]
conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
self.assertEqual(conv_and_sort(objs1), conv_and_sort(objs2))
**** CubicPower OpenStack Study ****
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
for primitive in primitives1:
self.assertIn(primitive, primitives2)
for primitive in primitives2:
self.assertIn(primitive, primitives1)
**** CubicPower OpenStack Study ****
class BaseTest(test.TestCase, ModelsObjectComparatorMixin):
**** CubicPower OpenStack Study ****
def setUp(self):
super(BaseTest, self).setUp()
self.ctxt = context.get_admin_context()
**** CubicPower OpenStack Study ****
class DBAPIServiceTestCase(BaseTest):
"""Unit tests for cinder.db.api.service_*."""
**** CubicPower OpenStack Study ****
def _get_base_values(self):
return {
'host': 'fake_host',
'binary': 'fake_binary',
'topic': 'fake_topic',
'report_count': 3,
'disabled': False
}
**** CubicPower OpenStack Study ****
def _create_service(self, values):
v = self._get_base_values()
v.update(values)
return db.service_create(self.ctxt, v)
**** CubicPower OpenStack Study ****
def test_service_create(self):
service = self._create_service({})
self.assertFalse(service['id'] is None)
for key, value in self._get_base_values().iteritems():
self.assertEqual(value, service[key])
**** CubicPower OpenStack Study ****
def test_service_destroy(self):
service1 = self._create_service({})
service2 = self._create_service({'host': 'fake_host2'})
db.service_destroy(self.ctxt, service1['id'])
self.assertRaises(exception.ServiceNotFound,
db.service_get, self.ctxt, service1['id'])
self._assertEqualObjects(db.service_get(self.ctxt, service2['id']),
service2)
**** CubicPower OpenStack Study ****
def test_service_update(self):
service = self._create_service({})
new_values = {
'host': 'fake_host1',
'binary': 'fake_binary1',
'topic': 'fake_topic1',
'report_count': 4,
'disabled': True
}
db.service_update(self.ctxt, service['id'], new_values)
updated_service = db.service_get(self.ctxt, service['id'])
for key, value in new_values.iteritems():
self.assertEqual(value, updated_service[key])
**** CubicPower OpenStack Study ****
def test_service_update_not_found_exception(self):
self.assertRaises(exception.ServiceNotFound,
db.service_update, self.ctxt, 100500, {})
**** CubicPower OpenStack Study ****
def test_service_get(self):
service1 = self._create_service({})
service2 = self._create_service({'host': 'some_other_fake_host'})
real_service1 = db.service_get(self.ctxt, service1['id'])
self._assertEqualObjects(service1, real_service1)
**** CubicPower OpenStack Study ****
def test_service_get_not_found_exception(self):
self.assertRaises(exception.ServiceNotFound,
db.service_get, self.ctxt, 100500)
**** CubicPower OpenStack Study ****
def test_service_get_by_host_and_topic(self):
service1 = self._create_service({'host': 'host1', 'topic': 'topic1'})
service2 = self._create_service({'host': 'host2', 'topic': 'topic2'})
real_service1 = db.service_get_by_host_and_topic(self.ctxt,
host='host1',
topic='topic1')
self._assertEqualObjects(service1, real_service1)
**** CubicPower OpenStack Study ****
def test_service_get_all(self):
values = [
{'host': 'host1', 'topic': 'topic1'},
{'host': 'host2', 'topic': 'topic2'},
{'disabled': True}
]
services = [self._create_service(vals) for vals in values]
disabled_services = [services[-1]]
non_disabled_services = services[:-1]
compares = [
(services, db.service_get_all(self.ctxt)),
(disabled_services, db.service_get_all(self.ctxt, True)),
(non_disabled_services, db.service_get_all(self.ctxt, False))
]
for comp in compares:
self._assertEqualListsOfObjects(*comp)
**** CubicPower OpenStack Study ****
def test_service_get_all_by_topic(self):
values = [
{'host': 'host1', 'topic': 't1'},
{'host': 'host2', 'topic': 't1'},
{'disabled': True, 'topic': 't1'},
{'host': 'host3', 'topic': 't2'}
]
services = [self._create_service(vals) for vals in values]
expected = services[:2]
real = db.service_get_all_by_topic(self.ctxt, 't1')
self._assertEqualListsOfObjects(expected, real)
**** CubicPower OpenStack Study ****
def test_service_get_all_by_host(self):
values = [
{'host': 'host1', 'topic': 't1'},
{'host': 'host1', 'topic': 't1'},
{'host': 'host2', 'topic': 't1'},
{'host': 'host3', 'topic': 't2'}
]
services = [self._create_service(vals) for vals in values]
expected = services[:2]
real = db.service_get_all_by_host(self.ctxt, 'host1')
self._assertEqualListsOfObjects(expected, real)
**** CubicPower OpenStack Study ****
def test_service_get_by_args(self):
values = [
{'host': 'host1', 'binary': 'a'},
{'host': 'host2', 'binary': 'b'}
]
services = [self._create_service(vals) for vals in values]
service1 = db.service_get_by_args(self.ctxt, 'host1', 'a')
self._assertEqualObjects(services[0], service1)
service2 = db.service_get_by_args(self.ctxt, 'host2', 'b')
self._assertEqualObjects(services[1], service2)
**** CubicPower OpenStack Study ****
def test_service_get_by_args_not_found_exception(self):
self.assertRaises(exception.HostBinaryNotFound,
db.service_get_by_args,
self.ctxt, 'non-exists-host', 'a')
**** CubicPower OpenStack Study ****
def test_service_get_all_volume_sorted(self):
values = [
({'host': 'h1', 'binary': 'a', 'topic': CONF.volume_topic},
100),
({'host': 'h2', 'binary': 'b', 'topic': CONF.volume_topic},
200),
({'host': 'h3', 'binary': 'b', 'topic': CONF.volume_topic},
300)]
services = []
for vals, size in values:
services.append(self._create_service(vals))
db.volume_create(self.ctxt, {'host': vals['host'], 'size': size})
for service, size in db.service_get_all_volume_sorted(self.ctxt):
self._assertEqualObjects(services.pop(0), service)
self.assertEqual(values.pop(0)[1], size)
**** CubicPower OpenStack Study ****
class DBAPIVolumeTestCase(BaseTest):
"""Unit tests for cinder.db.api.volume_*."""
**** CubicPower OpenStack Study ****
def test_volume_create(self):
volume = db.volume_create(self.ctxt, {'host': 'host1'})
self.assertTrue(uuidutils.is_uuid_like(volume['id']))
self.assertEqual(volume.host, 'host1')
**** CubicPower OpenStack Study ****
def test_volume_allocate_iscsi_target_no_more_targets(self):
self.assertRaises(exception.NoMoreTargets,
db.volume_allocate_iscsi_target,
self.ctxt, 42, 'host1')
**** CubicPower OpenStack Study ****
def test_volume_allocate_iscsi_target(self):
host = 'host1'
volume = db.volume_create(self.ctxt, {'host': host})
db.iscsi_target_create_safe(self.ctxt, {'host': host,
'target_num': 42})
target_num = db.volume_allocate_iscsi_target(self.ctxt, volume['id'],
host)
self.assertEqual(target_num, 42)
**** CubicPower OpenStack Study ****
def test_volume_attached_invalid_uuid(self):
self.assertRaises(exception.InvalidUUID, db.volume_attached, self.ctxt,
42, 'invalid-uuid', None, '/tmp')
**** CubicPower OpenStack Study ****
def test_volume_attached_to_instance(self):
volume = db.volume_create(self.ctxt, {'host': 'host1'})
instance_uuid = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
db.volume_attached(self.ctxt, volume['id'],
instance_uuid, None, '/tmp')
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual(volume['status'], 'in-use')
self.assertEqual(volume['mountpoint'], '/tmp')
self.assertEqual(volume['attach_status'], 'attached')
self.assertEqual(volume['instance_uuid'], instance_uuid)
self.assertIsNone(volume['attached_host'])
**** CubicPower OpenStack Study ****
def test_volume_attached_to_host(self):
volume = db.volume_create(self.ctxt, {'host': 'host1'})
host_name = 'fake_host'
db.volume_attached(self.ctxt, volume['id'],
None, host_name, '/tmp')
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual(volume['status'], 'in-use')
self.assertEqual(volume['mountpoint'], '/tmp')
self.assertEqual(volume['attach_status'], 'attached')
self.assertIsNone(volume['instance_uuid'])
self.assertEqual(volume['attached_host'], host_name)
**** CubicPower OpenStack Study ****
def test_volume_data_get_for_host(self):
for i in xrange(3):
for j in xrange(3):
db.volume_create(self.ctxt, {'host': 'h%d' % i, 'size': 100})
for i in xrange(3):
self.assertEqual((3, 300),
db.volume_data_get_for_host(
self.ctxt, 'h%d' % i))
**** CubicPower OpenStack Study ****
def test_volume_data_get_for_project(self):
for i in xrange(3):
for j in xrange(3):
db.volume_create(self.ctxt, {'project_id': 'p%d' % i,
'size': 100,
'host': 'h-%d-%d' % (i, j),
})
for i in xrange(3):
self.assertEqual((3, 300),
db.volume_data_get_for_project(
self.ctxt, 'p%d' % i))
**** CubicPower OpenStack Study ****
def test_volume_detached_from_instance(self):
volume = db.volume_create(self.ctxt, {})
db.volume_attached(self.ctxt, volume['id'],
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
None, '/tmp')
db.volume_detached(self.ctxt, volume['id'])
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual('available', volume['status'])
self.assertEqual('detached', volume['attach_status'])
self.assertIsNone(volume['mountpoint'])
self.assertIsNone(volume['instance_uuid'])
self.assertIsNone(volume['attached_host'])
**** CubicPower OpenStack Study ****
def test_volume_detached_from_host(self):
volume = db.volume_create(self.ctxt, {})
db.volume_attached(self.ctxt, volume['id'],
None, 'fake_host', '/tmp')
db.volume_detached(self.ctxt, volume['id'])
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual('available', volume['status'])
self.assertEqual('detached', volume['attach_status'])
self.assertIsNone(volume['mountpoint'])
self.assertIsNone(volume['instance_uuid'])
self.assertIsNone(volume['attached_host'])
**** CubicPower OpenStack Study ****
def test_volume_get(self):
volume = db.volume_create(self.ctxt, {})
self._assertEqualObjects(volume, db.volume_get(self.ctxt,
volume['id']))
**** CubicPower OpenStack Study ****
def test_volume_destroy(self):
volume = db.volume_create(self.ctxt, {})
db.volume_destroy(self.ctxt, volume['id'])
self.assertRaises(exception.VolumeNotFound, db.volume_get,
self.ctxt, volume['id'])
**** CubicPower OpenStack Study ****
def test_volume_get_all(self):
volumes = [db.volume_create(self.ctxt,
{'host': 'h%d' % i, 'size': i})
for i in xrange(3)]
self._assertEqualListsOfObjects(volumes, db.volume_get_all(
self.ctxt, None, None, 'host', None))
**** CubicPower OpenStack Study ****
def test_volume_get_all_marker_passed(self):
volumes = [
db.volume_create(self.ctxt, {'id': 1}),
db.volume_create(self.ctxt, {'id': 2}),
db.volume_create(self.ctxt, {'id': 3}),
db.volume_create(self.ctxt, {'id': 4}),
]
self._assertEqualListsOfObjects(volumes[2:], db.volume_get_all(
self.ctxt, 2, 2, 'id', None))
**** CubicPower OpenStack Study ****
def test_volume_get_all_by_host(self):
volumes = []
for i in xrange(3):
volumes.append([db.volume_create(self.ctxt, {'host': 'h%d' % i})
for j in xrange(3)])
for i in xrange(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_host(
self.ctxt, 'h%d' % i))
**** CubicPower OpenStack Study ****
def test_volume_get_all_by_instance_uuid(self):
instance_uuids = []
volumes = []
for i in xrange(3):
instance_uuid = str(uuidutils.uuid.uuid1())
instance_uuids.append(instance_uuid)
volumes.append([db.volume_create(self.ctxt,
{'instance_uuid': instance_uuid})
for j in xrange(3)])
for i in xrange(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_instance_uuid(
self.ctxt, instance_uuids[i]))
**** CubicPower OpenStack Study ****
def test_volume_get_all_by_instance_uuid_empty(self):
self.assertEqual([], db.volume_get_all_by_instance_uuid(self.ctxt,
'empty'))
**** CubicPower OpenStack Study ****
def test_volume_get_all_by_project(self):
volumes = []
for i in xrange(3):
volumes.append([db.volume_create(self.ctxt, {
'project_id': 'p%d' % i}) for j in xrange(3)])
for i in xrange(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_project(
self.ctxt, 'p%d' % i, None,
None, 'host', None))
**** CubicPower OpenStack Study ****
def test_volume_get_by_name(self):
db.volume_create(self.ctxt, {'display_name': 'vol1'})
db.volume_create(self.ctxt, {'display_name': 'vol2'})
db.volume_create(self.ctxt, {'display_name': 'vol3'})
# no name filter
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc')
self.assertEqual(len(volumes), 3)
# filter on name
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'display_name': 'vol2'})
self.assertEqual(len(volumes), 1)
self.assertEqual(volumes[0]['display_name'], 'vol2')
# filter no match
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'display_name': 'vol4'})
self.assertEqual(len(volumes), 0)
**** CubicPower OpenStack Study ****
def test_volume_list_by_status(self):
db.volume_create(self.ctxt, {'display_name': 'vol1',
'status': 'available'})
db.volume_create(self.ctxt, {'display_name': 'vol2',
'status': 'available'})
db.volume_create(self.ctxt, {'display_name': 'vol3',
'status': 'in-use'})
# no status filter
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc')
self.assertEqual(len(volumes), 3)
# single match
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'status': 'in-use'})
self.assertEqual(len(volumes), 1)
self.assertEqual(volumes[0]['status'], 'in-use')
# multiple match
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'status': 'available'})
self.assertEqual(len(volumes), 2)
for volume in volumes:
self.assertEqual(volume['status'], 'available')
# multiple filters
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'status': 'available',
'display_name': 'vol1'})
self.assertEqual(len(volumes), 1)
self.assertEqual(volumes[0]['display_name'], 'vol1')
self.assertEqual(volumes[0]['status'], 'available')
# no match
volumes = db.volume_get_all(self.ctxt, None, None, 'created_at',
'asc', {'status': 'in-use',
'display_name': 'vol1'})
self.assertEqual(len(volumes), 0)
**** CubicPower OpenStack Study ****
def _assertEqualsVolumeOrderResult(self, correct_order, limit=None,
sort_key='created_at', sort_dir='asc',
filters=None, project_id=None,
match_keys=['id', 'display_name',
'volume_metadata',
'created_at']):
""""Verifies that volumes are returned in the correct order."""
if project_id:
result = db.volume_get_all_by_project(self.ctxt, project_id, None,
limit, sort_key,
sort_dir, filters=filters)
else:
result = db.volume_get_all(self.ctxt, None, limit, sort_key,
sort_dir, filters=filters)
self.assertEqual(len(correct_order), len(result))
self.assertEqual(len(result), len(correct_order))
for vol1, vol2 in zip(result, correct_order):
for key in match_keys:
val1 = vol1.get(key)
val2 = vol2.get(key)
# metadata is a list, compare the 'key' and 'value' of each
if key == 'volume_metadata':
self.assertEqual(len(val1), len(val2))
for m1, m2 in zip(val1, val2):
self.assertEqual(m1.get('key'), m2.get('key'))
self.assertEqual(m1.get('value'), m2.get('value'))
else:
self.assertEqual(val1, val2)
**** CubicPower OpenStack Study ****
def test_volume_get_by_filter(self):
"""Verifies that all filtering is done at the DB layer."""
vols = []
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g1',
'display_name': 'name_%d' % i,
'size': 1})
for i in xrange(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g1',
'display_name': 'name_%d' % i,
'size': 2})
for i in xrange(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g1',
'display_name': 'name_%d' % i})
for i in xrange(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g2',
'display_name': 'name_%d' % i,
'size': 1})
for i in xrange(2)])
# By project, filter on size and name
filters = {'size': '1'}
correct_order = [vols[0], vols[1]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters,
project_id='g1')
filters = {'size': '1', 'display_name': 'name_1'}
correct_order = [vols[1]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters,
project_id='g1')
# Remove project scope
filters = {'size': '1'}
correct_order = [vols[0], vols[1], vols[6], vols[7]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters)
filters = {'size': '1', 'display_name': 'name_1'}
correct_order = [vols[1], vols[7]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters)
# Remove size constraint
filters = {'display_name': 'name_1'}
correct_order = [vols[1], vols[3], vols[5]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters,
project_id='g1')
correct_order = [vols[1], vols[3], vols[5], vols[7]]
self._assertEqualsVolumeOrderResult(correct_order, filters=filters)
# Verify bogus values return nothing
filters = {'display_name': 'name_1', 'bogus_value': 'foo'}
self._assertEqualsVolumeOrderResult([], filters=filters,
project_id='g1')
self._assertEqualsVolumeOrderResult([], project_id='bogus')
self._assertEqualsVolumeOrderResult([], filters=filters)
self._assertEqualsVolumeOrderResult([], filters={'metadata':
'not valid'})
self._assertEqualsVolumeOrderResult([], filters={'metadata':
['not', 'valid']})
# Verify that relationship property keys return nothing, these
# exist on the Volumes model but are not columns
filters = {'volume_type': 'bogus_type'}
self._assertEqualsVolumeOrderResult([], filters=filters)
**** CubicPower OpenStack Study ****
def test_volume_get_all_filters_limit(self):
vol1 = db.volume_create(self.ctxt, {'display_name': 'test1'})
vol2 = db.volume_create(self.ctxt, {'display_name': 'test2'})
vol3 = db.volume_create(self.ctxt, {'display_name': 'test2',
'metadata': {'key1': 'val1'}})
vol4 = db.volume_create(self.ctxt, {'display_name': 'test3',
'metadata': {'key1': 'val1',
'key2': 'val2'}})
vol5 = db.volume_create(self.ctxt, {'display_name': 'test3',
'metadata': {'key2': 'val2',
'key3': 'val3'},
'host': 'host5'})
vols = [vol1, vol2, vol3, vol4, vol5]
# Ensure we have 5 total instances
self._assertEqualsVolumeOrderResult(vols)
# No filters, test limit
self._assertEqualsVolumeOrderResult(vols[:1], limit=1)
self._assertEqualsVolumeOrderResult(vols[:4], limit=4)
# Just the test2 volumes
filters = {'display_name': 'test2'}
self._assertEqualsVolumeOrderResult([vol2, vol3], filters=filters)
self._assertEqualsVolumeOrderResult([vol2], limit=1,
filters=filters)
self._assertEqualsVolumeOrderResult([vol2, vol3], limit=2,
filters=filters)
self._assertEqualsVolumeOrderResult([vol2, vol3], limit=100,
filters=filters)
# metdata filters
filters = {'metadata': {'key1': 'val1'}}
self._assertEqualsVolumeOrderResult([vol3, vol4], filters=filters)
self._assertEqualsVolumeOrderResult([vol3], limit=1,
filters=filters)
self._assertEqualsVolumeOrderResult([vol3, vol4], limit=10,
filters=filters)
filters = {'metadata': {'key1': 'val1',
'key2': 'val2'}}
self._assertEqualsVolumeOrderResult([vol4], filters=filters)
self._assertEqualsVolumeOrderResult([vol4], limit=1,
filters=filters)
# No match
filters = {'metadata': {'key1': 'val1',
'key2': 'val2',
'key3': 'val3'}}
self._assertEqualsVolumeOrderResult([], filters=filters)
filters = {'metadata': {'key1': 'val1',
'key2': 'bogus'}}
self._assertEqualsVolumeOrderResult([], filters=filters)
filters = {'metadata': {'key1': 'val1',
'key2': 'val1'}}
self._assertEqualsVolumeOrderResult([], filters=filters)
# Combination
filters = {'display_name': 'test2',
'metadata': {'key1': 'val1'}}
self._assertEqualsVolumeOrderResult([vol3], filters=filters)
self._assertEqualsVolumeOrderResult([vol3], limit=1,
filters=filters)
self._assertEqualsVolumeOrderResult([vol3], limit=100,
filters=filters)
filters = {'display_name': 'test3',
'metadata': {'key2': 'val2',
'key3': 'val3'},
'host': 'host5'}
self._assertEqualsVolumeOrderResult([vol5], filters=filters)
self._assertEqualsVolumeOrderResult([vol5], limit=1,
filters=filters)
**** CubicPower OpenStack Study ****
def test_volume_get_no_migration_targets(self):
"""Verifies the unique 'no_migration_targets'=True filter.
This filter returns volumes with either a NULL 'migration_status'
or a non-NULL value that does not start with 'target:'.
"""
vol1 = db.volume_create(self.ctxt, {'display_name': 'test1'})
vol2 = db.volume_create(self.ctxt, {'display_name': 'test2',
'migration_status': 'bogus'})
vol3 = db.volume_create(self.ctxt, {'display_name': 'test3',
'migration_status': 'btarget:'})
vol4 = db.volume_create(self.ctxt, {'display_name': 'test4',
'migration_status': 'target:'})
vols = [vol1, vol2, vol3, vol4]
# Ensure we have 4 total instances
self._assertEqualsVolumeOrderResult(vols)
# Apply the unique filter
filters = {'no_migration_targets': True}
self._assertEqualsVolumeOrderResult([vol1, vol2, vol3],
filters=filters)
self._assertEqualsVolumeOrderResult([vol1, vol2], limit=2,
filters=filters)
filters = {'no_migration_targets': True,
'display_name': 'test4'}
self._assertEqualsVolumeOrderResult([], filters=filters)
**** CubicPower OpenStack Study ****
def test_volume_get_iscsi_target_num(self):
target = db.iscsi_target_create_safe(self.ctxt, {'volume_id': 42,
'target_num': 43})
self.assertEqual(43, db.volume_get_iscsi_target_num(self.ctxt, 42))
**** CubicPower OpenStack Study ****
def test_volume_get_iscsi_target_num_nonexistent(self):
self.assertRaises(exception.ISCSITargetNotFoundForVolume,
db.volume_get_iscsi_target_num, self.ctxt, 42)
**** CubicPower OpenStack Study ****
def test_volume_update(self):
volume = db.volume_create(self.ctxt, {'host': 'h1'})
db.volume_update(self.ctxt, volume['id'],
{'host': 'h2', 'metadata': {'m1': 'v1'}})
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual('h2', volume['host'])
**** CubicPower OpenStack Study ****
def test_volume_update_nonexistent(self):
self.assertRaises(exception.VolumeNotFound, db.volume_update,
self.ctxt, 42, {})
**** CubicPower OpenStack Study ****
def test_volume_metadata_get(self):
metadata = {'a': 'b', 'c': 'd'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata})
self.assertEqual(metadata, db.volume_metadata_get(self.ctxt, 1))
**** CubicPower OpenStack Study ****
def test_volume_metadata_update(self):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '5'}
should_be = {'a': '3', 'c': '2', 'd': '5'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata1})
db_meta = db.volume_metadata_update(self.ctxt, 1, metadata2, False)
self.assertEqual(should_be, db_meta)
**** CubicPower OpenStack Study ****
def test_volume_metadata_update_delete(self):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '4'}
should_be = metadata2
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata1})
db_meta = db.volume_metadata_update(self.ctxt, 1, metadata2, True)
self.assertEqual(should_be, db_meta)
**** CubicPower OpenStack Study ****
def test_volume_metadata_delete(self):
metadata = {'a': 'b', 'c': 'd'}
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata})
db.volume_metadata_delete(self.ctxt, 1, 'c')
metadata.pop('c')
self.assertEqual(metadata, db.volume_metadata_get(self.ctxt, 1))
**** CubicPower OpenStack Study ****
class DBAPISnapshotTestCase(BaseTest):
"""Tests for cinder.db.api.snapshot_*."""
**** CubicPower OpenStack Study ****
def test_snapshot_data_get_for_project(self):
actual = db.snapshot_data_get_for_project(self.ctxt, 'project1')
self.assertEqual(actual, (0, 0))
db.volume_create(self.ctxt, {'id': 1,
'project_id': 'project1',
'size': 42})
snapshot = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1,
'project_id': 'project1',
'volume_size': 42})
actual = db.snapshot_data_get_for_project(self.ctxt, 'project1')
self.assertEqual(actual, (1, 42))
**** CubicPower OpenStack Study ****
def test_snapshot_get_all(self):
db.volume_create(self.ctxt, {'id': 1})
snapshot = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1})
self._assertEqualListsOfObjects([snapshot],
db.snapshot_get_all(self.ctxt),
ignored_keys=['metadata', 'volume'])
**** CubicPower OpenStack Study ****
def test_snapshot_metadata_get(self):
metadata = {'a': 'b', 'c': 'd'}
db.volume_create(self.ctxt, {'id': 1})
db.snapshot_create(self.ctxt,
{'id': 1, 'volume_id': 1, 'metadata': metadata})
self.assertEqual(metadata, db.snapshot_metadata_get(self.ctxt, 1))
**** CubicPower OpenStack Study ****
def test_snapshot_metadata_update(self):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '5'}
should_be = {'a': '3', 'c': '2', 'd': '5'}
db.volume_create(self.ctxt, {'id': 1})
db.snapshot_create(self.ctxt,
{'id': 1, 'volume_id': 1, 'metadata': metadata1})
db_meta = db.snapshot_metadata_update(self.ctxt, 1, metadata2, False)
self.assertEqual(should_be, db_meta)
**** CubicPower OpenStack Study ****
def test_snapshot_metadata_update_delete(self):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '5'}
should_be = metadata2
db.volume_create(self.ctxt, {'id': 1})
db.snapshot_create(self.ctxt,
{'id': 1, 'volume_id': 1, 'metadata': metadata1})
db_meta = db.snapshot_metadata_update(self.ctxt, 1, metadata2, True)
self.assertEqual(should_be, db_meta)
**** CubicPower OpenStack Study ****
def test_snapshot_metadata_delete(self):
metadata = {'a': '1', 'c': '2'}
should_be = {'a': '1'}
db.volume_create(self.ctxt, {'id': 1})
db.snapshot_create(self.ctxt,
{'id': 1, 'volume_id': 1, 'metadata': metadata})
db.snapshot_metadata_delete(self.ctxt, 1, 'c')
self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
**** CubicPower OpenStack Study ****
class DBAPIVolumeTypeTestCase(BaseTest):
"""Tests for the db.api.volume_type_* methods."""
**** CubicPower OpenStack Study ****
def setUp(self):
self.ctxt = context.get_admin_context()
super(DBAPIVolumeTypeTestCase, self).setUp()
**** CubicPower OpenStack Study ****
def test_volume_type_create_exists(self):
vt = db.volume_type_create(self.ctxt, {'name': 'n1'})
self.assertRaises(exception.VolumeTypeExists,
db.volume_type_create,
self.ctxt,
{'name': 'n1'})
self.assertRaises(exception.VolumeTypeExists,
db.volume_type_create,
self.ctxt,
{'name': 'n2', 'id': vt['id']})
**** CubicPower OpenStack Study ****
class DBAPIEncryptionTestCase(BaseTest):
"""Tests for the db.api.volume_type_encryption_* methods."""
_ignored_keys = [
'deleted',
'deleted_at',
'created_at',
'updated_at',
]
**** CubicPower OpenStack Study ****
def setUp(self):
super(DBAPIEncryptionTestCase, self).setUp()
self.created = \
[db.volume_type_encryption_create(self.ctxt,
values['volume_type_id'], values)
for values in self._get_values()]
**** CubicPower OpenStack Study ****
def _get_values(self, one=False, updated=False):
base_values = {
'cipher': 'fake_cipher',
'key_size': 256,
'provider': 'fake_provider',
'volume_type_id': 'fake_type',
'control_location': 'front-end',
}
updated_values = {
'cipher': 'fake_updated_cipher',
'key_size': 512,
'provider': 'fake_updated_provider',
'volume_type_id': 'fake_type',
'control_location': 'front-end',
}
if one:
return base_values
if updated:
values = updated_values
else:
values = base_values
def compose(val, step):
if isinstance(val, str):
step = str(step)
return val + step
return [dict([(k, compose(v, i)) for k, v in values.items()])
for i in range(1, 4)]
**** CubicPower OpenStack Study ****
def compose(val, step):
if isinstance(val, str):
step = str(step)
return val + step
return [dict([(k, compose(v, i)) for k, v in values.items()])
for i in range(1, 4)]
**** CubicPower OpenStack Study ****
def test_volume_type_encryption_create(self):
values = self._get_values()
for i, encryption in enumerate(self.created):
self._assertEqualObjects(values[i], encryption, self._ignored_keys)
**** CubicPower OpenStack Study ****
def test_volume_type_encryption_update(self):
update_values = self._get_values(updated=True)
self.updated = \
[db.volume_type_encryption_update(self.ctxt,
values['volume_type_id'], values)
for values in update_values]
for i, encryption in enumerate(self.updated):
self._assertEqualObjects(update_values[i], encryption,
self._ignored_keys)
**** CubicPower OpenStack Study ****
def test_volume_type_encryption_get(self):
for encryption in self.created:
encryption_get = \
db.volume_type_encryption_get(self.ctxt,
encryption['volume_type_id'])
self._assertEqualObjects(encryption, encryption_get,
self._ignored_keys)
**** CubicPower OpenStack Study ****
def test_volume_type_update_with_no_create(self):
self.assertRaises(exception.VolumeTypeEncryptionNotFound,
db.volume_type_encryption_update,
self.ctxt,
'fake_no_create_type',
{'cipher': 'fake_updated_cipher'})
**** CubicPower OpenStack Study ****
def test_volume_type_encryption_delete(self):
values = {
'cipher': 'fake_cipher',
'key_size': 256,
'provider': 'fake_provider',
'volume_type_id': 'fake_type',
'control_location': 'front-end',
}
encryption = db.volume_type_encryption_create(self.ctxt, 'fake_type',
values)
self._assertEqualObjects(values, encryption, self._ignored_keys)
db.volume_type_encryption_delete(self.ctxt,
encryption['volume_type_id'])
encryption_get = \
db.volume_type_encryption_get(self.ctxt,
encryption['volume_type_id'])
self.assertIsNone(encryption_get)
**** CubicPower OpenStack Study ****
class DBAPIReservationTestCase(BaseTest):
"""Tests for db.api.reservation_* methods."""
**** CubicPower OpenStack Study ****
def setUp(self):
super(DBAPIReservationTestCase, self).setUp()
self.values = {
'uuid': 'sample-uuid',
'project_id': 'project1',
'resource': 'resource',
'delta': 42,
'expire': (datetime.datetime.utcnow() +
datetime.timedelta(days=1)),
'usage': {'id': 1}
}
**** CubicPower OpenStack Study ****
def test_reservation_create(self):
reservation = db.reservation_create(self.ctxt, **self.values)
self._assertEqualObjects(self.values, reservation, ignored_keys=(
'deleted', 'updated_at',
'deleted_at', 'id',
'created_at', 'usage',
'usage_id'))
self.assertEqual(reservation['usage_id'], self.values['usage']['id'])
**** CubicPower OpenStack Study ****
def test_reservation_get(self):
reservation = db.reservation_create(self.ctxt, **self.values)
reservation_db = db.reservation_get(self.ctxt, self.values['uuid'])
self._assertEqualObjects(reservation, reservation_db)
**** CubicPower OpenStack Study ****
def test_reservation_get_nonexistent(self):
self.assertRaises(exception.ReservationNotFound,
db.reservation_get,
self.ctxt,
'non-exitent-resevation-uuid')
**** CubicPower OpenStack Study ****
def test_reservation_commit(self):
reservations = _quota_reserve(self.ctxt, 'project1')
expected = {'project_id': 'project1',
'volumes': {'reserved': 1, 'in_use': 0},
'gigabytes': {'reserved': 2, 'in_use': 0},
}
self.assertEqual(expected,
db.quota_usage_get_all_by_project(
self.ctxt, 'project1'))
db.reservation_get(self.ctxt, reservations[0])
db.reservation_commit(self.ctxt, reservations, 'project1')
self.assertRaises(exception.ReservationNotFound,
db.reservation_get,
self.ctxt,
reservations[0])
expected = {'project_id': 'project1',
'volumes': {'reserved': 0, 'in_use': 1},
'gigabytes': {'reserved': 0, 'in_use': 2},
}
self.assertEqual(expected,
db.quota_usage_get_all_by_project(
self.ctxt,
'project1'))
**** CubicPower OpenStack Study ****
def test_reservation_rollback(self):
reservations = _quota_reserve(self.ctxt, 'project1')
expected = {'project_id': 'project1',
'volumes': {'reserved': 1, 'in_use': 0},
'gigabytes': {'reserved': 2, 'in_use': 0},
}
self.assertEqual(expected,
db.quota_usage_get_all_by_project(
self.ctxt,
'project1'))
db.reservation_get(self.ctxt, reservations[0])
db.reservation_rollback(self.ctxt, reservations, 'project1')
self.assertRaises(exception.ReservationNotFound,
db.reservation_get,
self.ctxt,
reservations[0])
expected = {'project_id': 'project1',
'volumes': {'reserved': 0, 'in_use': 0},
'gigabytes': {'reserved': 0, 'in_use': 0},
}
self.assertEqual(expected,
db.quota_usage_get_all_by_project(
self.ctxt,
'project1'))
**** CubicPower OpenStack Study ****
def test_reservation_get_all_by_project(self):
reservations = _quota_reserve(self.ctxt, 'project1')
r1 = db.reservation_get(self.ctxt, reservations[0])
r2 = db.reservation_get(self.ctxt, reservations[1])
expected = {'project_id': 'project1',
r1['resource']: {r1['uuid']: r1['delta']},
r2['resource']: {r2['uuid']: r2['delta']}}
self.assertEqual(expected, db.reservation_get_all_by_project(
self.ctxt, 'project1'))
**** CubicPower OpenStack Study ****
def test_reservation_expire(self):
self.values['expire'] = datetime.datetime.utcnow() + \
datetime.timedelta(days=1)
reservations = _quota_reserve(self.ctxt, 'project1')
db.reservation_expire(self.ctxt)
expected = {'project_id': 'project1',
'gigabytes': {'reserved': 0, 'in_use': 0},
'volumes': {'reserved': 0, 'in_use': 0}}
self.assertEqual(expected,
db.quota_usage_get_all_by_project(
self.ctxt,
'project1'))
**** CubicPower OpenStack Study ****
def test_reservation_destroy(self):
reservations = _quota_reserve(self.ctxt, 'project1')
r1 = db.reservation_get(self.ctxt, reservations[0])
db.reservation_destroy(self.ctxt, reservations[1])
expected = {'project_id': 'project1',
r1['resource']: {r1['uuid']: r1['delta']}}
self.assertEqual(expected, db.reservation_get_all_by_project(
self.ctxt, 'project1'))
**** CubicPower OpenStack Study ****
class DBAPIQuotaClassTestCase(BaseTest):
"""Tests for db.api.quota_class_* methods."""
**** CubicPower OpenStack Study ****
def setUp(self):
super(DBAPIQuotaClassTestCase, self).setUp()
self.sample_qc = db.quota_class_create(self.ctxt, 'test_qc',
'test_resource', 42)
**** CubicPower OpenStack Study ****
def test_quota_class_get(self):
qc = db.quota_class_get(self.ctxt, 'test_qc', 'test_resource')
self._assertEqualObjects(self.sample_qc, qc)
**** CubicPower OpenStack Study ****
def test_quota_class_destroy(self):
db.quota_class_destroy(self.ctxt, 'test_qc', 'test_resource')
self.assertRaises(exception.QuotaClassNotFound,
db.quota_class_get, self.ctxt,
'test_qc', 'test_resource')
**** CubicPower OpenStack Study ****
def test_quota_class_get_not_found(self):
self.assertRaises(exception.QuotaClassNotFound,
db.quota_class_get, self.ctxt, 'nonexistent',
'nonexistent')
**** CubicPower OpenStack Study ****
def test_quota_class_get_all_by_name(self):
sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
self.assertEqual({'class_name': 'test_qc', 'test_resource': 42},
db.quota_class_get_all_by_name(self.ctxt, 'test_qc'))
self.assertEqual({'class_name': 'test2', 'res1': 43, 'res2': 44},
db.quota_class_get_all_by_name(self.ctxt, 'test2'))
**** CubicPower OpenStack Study ****
def test_quota_class_update(self):
db.quota_class_update(self.ctxt, 'test_qc', 'test_resource', 43)
updated = db.quota_class_get(self.ctxt, 'test_qc', 'test_resource')
self.assertEqual(43, updated['hard_limit'])
**** CubicPower OpenStack Study ****
def test_quota_class_destroy_all_by_name(self):
sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
db.quota_class_destroy_all_by_name(self.ctxt, 'test2')
self.assertEqual({'class_name': 'test2'},
db.quota_class_get_all_by_name(self.ctxt, 'test2'))
**** CubicPower OpenStack Study ****
class DBAPIQuotaTestCase(BaseTest):
"""Tests for db.api.reservation_* methods."""
**** CubicPower OpenStack Study ****
def test_quota_create(self):
quota = db.quota_create(self.ctxt, 'project1', 'resource', 99)
self.assertEqual(quota.resource, 'resource')
self.assertEqual(quota.hard_limit, 99)
self.assertEqual(quota.project_id, 'project1')
**** CubicPower OpenStack Study ****
def test_quota_get(self):
quota = db.quota_create(self.ctxt, 'project1', 'resource', 99)
quota_db = db.quota_get(self.ctxt, 'project1', 'resource')
self._assertEqualObjects(quota, quota_db)
**** CubicPower OpenStack Study ****
def test_quota_get_all_by_project(self):
for i in range(3):
for j in range(3):
db.quota_create(self.ctxt, 'proj%d' % i, 'res%d' % j, j)
for i in range(3):
quotas_db = db.quota_get_all_by_project(self.ctxt, 'proj%d' % i)
self.assertEqual(quotas_db, {'project_id': 'proj%d' % i,
'res0': 0,
'res1': 1,
'res2': 2})
**** CubicPower OpenStack Study ****
def test_quota_update(self):
db.quota_create(self.ctxt, 'project1', 'resource1', 41)
db.quota_update(self.ctxt, 'project1', 'resource1', 42)
quota = db.quota_get(self.ctxt, 'project1', 'resource1')
self.assertEqual(quota.hard_limit, 42)
self.assertEqual(quota.resource, 'resource1')
self.assertEqual(quota.project_id, 'project1')
**** CubicPower OpenStack Study ****
def test_quota_update_nonexistent(self):
self.assertRaises(exception.ProjectQuotaNotFound,
db.quota_update,
self.ctxt,
'project1',
'resource1',
42)
**** CubicPower OpenStack Study ****
def test_quota_get_nonexistent(self):
self.assertRaises(exception.ProjectQuotaNotFound,
db.quota_get,
self.ctxt,
'project1',
'resource1')
**** CubicPower OpenStack Study ****
def test_quota_reserve(self):
reservations = _quota_reserve(self.ctxt, 'project1')
self.assertEqual(len(reservations), 2)
res_names = ['gigabytes', 'volumes']
for uuid in reservations:
reservation = db.reservation_get(self.ctxt, uuid)
self.assertIn(reservation.resource, res_names)
res_names.remove(reservation.resource)
**** CubicPower OpenStack Study ****
def test_quota_destroy(self):
db.quota_create(self.ctxt, 'project1', 'resource1', 41)
self.assertIsNone(db.quota_destroy(self.ctxt, 'project1',
'resource1'))
self.assertRaises(exception.ProjectQuotaNotFound, db.quota_get,
self.ctxt, 'project1', 'resource1')
**** CubicPower OpenStack Study ****
def test_quota_destroy_all_by_project(self):
reservations = _quota_reserve(self.ctxt, 'project1')
db.quota_destroy_all_by_project(self.ctxt, 'project1')
self.assertEqual(db.quota_get_all_by_project(self.ctxt, 'project1'),
{'project_id': 'project1'})
self.assertEqual(db.quota_usage_get_all_by_project(self.ctxt,
'project1'),
{'project_id': 'project1'})
for r in reservations:
self.assertRaises(exception.ReservationNotFound,
db.reservation_get,
self.ctxt,
r)
**** CubicPower OpenStack Study ****
def test_quota_usage_get_nonexistent(self):
self.assertRaises(exception.QuotaUsageNotFound,
db.quota_usage_get,
self.ctxt,
'p1',
'nonexitent_resource')
**** CubicPower OpenStack Study ****
def test_quota_usage_get(self):
reservations = _quota_reserve(self.ctxt, 'p1')
quota_usage = db.quota_usage_get(self.ctxt, 'p1', 'gigabytes')
expected = {'resource': 'gigabytes', 'project_id': 'p1',
'in_use': 0, 'reserved': 2, 'total': 2}
for key, value in expected.iteritems():
self.assertEqual(value, quota_usage[key], key)
**** CubicPower OpenStack Study ****
def test_quota_usage_get_all_by_project(self):
reservations = _quota_reserve(self.ctxt, 'p1')
expected = {'project_id': 'p1',
'volumes': {'in_use': 0, 'reserved': 1},
'gigabytes': {'in_use': 0, 'reserved': 2}}
self.assertEqual(expected, db.quota_usage_get_all_by_project(
self.ctxt, 'p1'))
**** CubicPower OpenStack Study ****
class DBAPIIscsiTargetTestCase(BaseTest):
"""Unit tests for cinder.db.api.iscsi_target_*."""
**** CubicPower OpenStack Study ****
def _get_base_values(self):
return {'target_num': 10, 'host': 'fake_host'}
**** CubicPower OpenStack Study ****
def test_iscsi_target_create_safe(self):
target = db.iscsi_target_create_safe(self.ctxt,
self._get_base_values())
self.assertTrue(target['id'])
self.assertEqual(target['host'], 'fake_host')
self.assertEqual(target['target_num'], 10)
**** CubicPower OpenStack Study ****
def test_iscsi_target_count_by_host(self):
for i in range(3):
values = self._get_base_values()
values['target_num'] += i
db.iscsi_target_create_safe(self.ctxt, values)
self.assertEqual(db.iscsi_target_count_by_host(self.ctxt, 'fake_host'),
3)
@test.testtools.skip("bug 1187367")
**** CubicPower OpenStack Study ****
def test_integrity_error(self):
db.iscsi_target_create_safe(self.ctxt, self._get_base_values())
self.assertFalse(db.iscsi_target_create_safe(self.ctxt,
self._get_base_values()))
**** CubicPower OpenStack Study ****
class DBAPIBackupTestCase(BaseTest):
"""Tests for db.api.backup_* methods."""
_ignored_keys = ['id', 'deleted', 'deleted_at', 'created_at', 'updated_at']
**** CubicPower OpenStack Study ****
def setUp(self):
super(DBAPIBackupTestCase, self).setUp()
self.created = [db.backup_create(self.ctxt, values)
for values in self._get_values()]
**** CubicPower OpenStack Study ****
def _get_values(self, one=False):
base_values = {
'user_id': 'user',
'project_id': 'project',
'volume_id': 'volume',
'host': 'host',
'availability_zone': 'zone',
'display_name': 'display',
'display_description': 'description',
'container': 'container',
'status': 'status',
'fail_reason': 'test',
'service_metadata': 'metadata',
'service': 'service',
'size': 1000,
'object_count': 100}
if one:
return base_values
def compose(val, step):
if isinstance(val, str):
step = str(step)
return val + step
return [dict([(k, compose(v, i)) for k, v in base_values.items()])
for i in range(1, 4)]
**** CubicPower OpenStack Study ****
def compose(val, step):
if isinstance(val, str):
step = str(step)
return val + step
return [dict([(k, compose(v, i)) for k, v in base_values.items()])
for i in range(1, 4)]
**** CubicPower OpenStack Study ****
def test_backup_create(self):
values = self._get_values()
for i, backup in enumerate(self.created):
self.assertTrue(backup['id'])
self._assertEqualObjects(values[i], backup, self._ignored_keys)
**** CubicPower OpenStack Study ****
def test_backup_get(self):
for backup in self.created:
backup_get = db.backup_get(self.ctxt, backup['id'])
self._assertEqualObjects(backup, backup_get)
**** CubicPower OpenStack Study ****
def tests_backup_get_all(self):
all_backups = db.backup_get_all(self.ctxt)
self._assertEqualListsOfObjects(self.created, all_backups)
**** CubicPower OpenStack Study ****
def test_backup_get_all_by_host(self):
byhost = db.backup_get_all_by_host(self.ctxt,
self.created[1]['host'])
self._assertEqualObjects(self.created[1], byhost[0])
**** CubicPower OpenStack Study ****
def test_backup_get_all_by_project(self):
byproj = db.backup_get_all_by_project(self.ctxt,
self.created[1]['project_id'])
self._assertEqualObjects(self.created[1], byproj[0])
**** CubicPower OpenStack Study ****
def test_backup_update_nonexistent(self):
self.assertRaises(exception.BackupNotFound,
db.backup_update,
self.ctxt, 'nonexistent', {})
**** CubicPower OpenStack Study ****
def test_backup_update(self):
updated_values = self._get_values(one=True)
update_id = self.created[1]['id']
updated_backup = db.backup_update(self.ctxt, update_id,
updated_values)
self._assertEqualObjects(updated_values, updated_backup,
self._ignored_keys)
**** CubicPower OpenStack Study ****
def test_backup_destroy(self):
for backup in self.created:
db.backup_destroy(self.ctxt, backup['id'])
self.assertFalse(db.backup_get_all(self.ctxt))
**** CubicPower OpenStack Study ****
def test_backup_not_found(self):
self.assertRaises(exception.BackupNotFound, db.backup_get, self.ctxt,
'notinbase')