**** CubicPower OpenStack Study ****
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo.config import cfg
from cinder import context
from cinder import db
from cinder.db.sqlalchemy import api as sqa_api
from cinder.db.sqlalchemy import models as sqa_models
from cinder import exception
from cinder.openstack.common import timeutils
from cinder import quota
from cinder import test
import cinder.tests.image.fake
from cinder import volume
CONF = cfg.CONF
**** CubicPower OpenStack Study ****
class QuotaIntegrationTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(QuotaIntegrationTestCase, self).setUp()
self.volume_type_name = CONF.default_volume_type
self.volume_type = db.volume_type_create(
context.get_admin_context(),
dict(name=self.volume_type_name))
self.flags(quota_volumes=2,
quota_snapshots=2,
quota_gigabytes=20)
self.user_id = 'admin'
self.project_id = 'admin'
self.context = context.RequestContext(self.user_id,
self.project_id,
is_admin=True)
# Destroy the 'default' quota_class in the database to avoid
# conflicts with the test cases here that are setting up their own
# defaults.
db.quota_class_destroy_all_by_name(self.context, 'default')
**** CubicPower OpenStack Study ****
def tearDown(self):
db.volume_type_destroy(context.get_admin_context(),
self.volume_type['id'])
super(QuotaIntegrationTestCase, self).tearDown()
cinder.tests.image.fake.FakeImageService_reset()
**** CubicPower OpenStack Study ****
def _create_volume(self, size=1):
"""Create a test volume."""
vol = {}
vol['user_id'] = self.user_id
vol['project_id'] = self.project_id
vol['size'] = size
vol['status'] = 'available'
vol['volume_type_id'] = self.volume_type['id']
return db.volume_create(self.context, vol)
**** CubicPower OpenStack Study ****
def _create_snapshot(self, volume):
snapshot = {}
snapshot['user_id'] = self.user_id
snapshot['project_id'] = self.project_id
snapshot['volume_id'] = volume['id']
snapshot['volume_size'] = volume['size']
snapshot['status'] = 'available'
return db.snapshot_create(self.context, snapshot)
**** CubicPower OpenStack Study ****
def test_too_many_volumes(self):
volume_ids = []
for i in range(CONF.quota_volumes):
vol_ref = self._create_volume()
volume_ids.append(vol_ref['id'])
self.assertRaises(exception.VolumeLimitExceeded,
volume.API().create,
self.context, 1, '', '',
volume_type=self.volume_type)
for volume_id in volume_ids:
db.volume_destroy(self.context, volume_id)
**** CubicPower OpenStack Study ****
def test_too_many_volumes_of_type(self):
resource = 'volumes_%s' % self.volume_type_name
db.quota_class_create(self.context, 'default', resource, 1)
flag_args = {
'quota_volumes': 2000,
'quota_gigabytes': 2000
}
self.flags(**flag_args)
vol_ref = self._create_volume()
self.assertRaises(exception.VolumeLimitExceeded,
volume.API().create,
self.context, 1, '', '',
volume_type=self.volume_type)
db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
def test_too_many_snapshots_of_type(self):
resource = 'snapshots_%s' % self.volume_type_name
db.quota_class_create(self.context, 'default', resource, 1)
flag_args = {
'quota_volumes': 2000,
'quota_gigabytes': 2000,
}
self.flags(**flag_args)
vol_ref = self._create_volume()
snap_ref = self._create_snapshot(vol_ref)
self.assertRaises(exception.SnapshotLimitExceeded,
volume.API().create_snapshot,
self.context, vol_ref, '', '')
db.snapshot_destroy(self.context, snap_ref['id'])
db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
def test_too_many_gigabytes(self):
volume_ids = []
vol_ref = self._create_volume(size=20)
volume_ids.append(vol_ref['id'])
self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
volume.API().create,
self.context, 1, '', '',
volume_type=self.volume_type)
for volume_id in volume_ids:
db.volume_destroy(self.context, volume_id)
**** CubicPower OpenStack Study ****
def test_too_many_combined_gigabytes(self):
vol_ref = self._create_volume(size=10)
snap_ref = self._create_snapshot(vol_ref)
self.assertRaises(exception.QuotaError,
volume.API().create_snapshot,
self.context, vol_ref, '', '')
usages = db.quota_usage_get_all_by_project(self.context,
self.project_id)
self.assertEqual(usages['gigabytes']['in_use'], 20)
db.snapshot_destroy(self.context, snap_ref['id'])
db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
def test_no_snapshot_gb_quota_flag(self):
self.flags(quota_volumes=2,
quota_snapshots=2,
quota_gigabytes=20,
no_snapshot_gb_quota=True)
vol_ref = self._create_volume(size=10)
snap_ref = self._create_snapshot(vol_ref)
snap_ref2 = volume.API().create_snapshot(self.context,
vol_ref, '', '')
# Make sure no reservation was created for snapshot gigabytes.
reservations = db.reservation_get_all_by_project(self.context,
self.project_id)
self.assertIsNone(reservations.get('gigabytes'))
# Make sure the snapshot volume_size isn't included in usage.
vol_ref2 = volume.API().create(self.context, 10, '', '')
usages = db.quota_usage_get_all_by_project(self.context,
self.project_id)
self.assertEqual(usages['gigabytes']['in_use'], 20)
db.snapshot_destroy(self.context, snap_ref['id'])
db.snapshot_destroy(self.context, snap_ref2['id'])
db.volume_destroy(self.context, vol_ref['id'])
db.volume_destroy(self.context, vol_ref2['id'])
**** CubicPower OpenStack Study ****
def test_too_many_gigabytes_of_type(self):
resource = 'gigabytes_%s' % self.volume_type_name
db.quota_class_create(self.context, 'default', resource, 10)
flag_args = {
'quota_volumes': 2000,
'quota_gigabytes': 2000,
}
self.flags(**flag_args)
vol_ref = self._create_volume(size=10)
self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
volume.API().create,
self.context, 1, '', '',
volume_type=self.volume_type)
db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
class FakeContext(object):
**** CubicPower OpenStack Study ****
def __init__(self, project_id, quota_class):
self.is_admin = False
self.user_id = 'fake_user'
self.project_id = project_id
self.quota_class = quota_class
**** CubicPower OpenStack Study ****
def elevated(self):
elevated = self.__class__(self.project_id, self.quota_class)
elevated.is_admin = True
return elevated
**** CubicPower OpenStack Study ****
class FakeDriver(object):
**** CubicPower OpenStack Study ****
def __init__(self, by_project=None, by_class=None, reservations=None):
self.called = []
self.by_project = by_project or {}
self.by_class = by_class or {}
self.reservations = reservations or []
**** CubicPower OpenStack Study ****
def get_by_project(self, context, project_id, resource):
self.called.append(('get_by_project', context, project_id, resource))
try:
return self.by_project[project_id][resource]
except KeyError:
raise exception.ProjectQuotaNotFound(project_id=project_id)
**** CubicPower OpenStack Study ****
def get_by_class(self, context, quota_class, resource):
self.called.append(('get_by_class', context, quota_class, resource))
try:
return self.by_class[quota_class][resource]
except KeyError:
raise exception.QuotaClassNotFound(class_name=quota_class)
**** CubicPower OpenStack Study ****
def get_default(self, context, resource):
self.called.append(('get_default', context, resource))
return resource.default
**** CubicPower OpenStack Study ****
def get_defaults(self, context, resources):
self.called.append(('get_defaults', context, resources))
return resources
**** CubicPower OpenStack Study ****
def get_class_quotas(self, context, resources, quota_class,
defaults=True):
self.called.append(('get_class_quotas', context, resources,
quota_class, defaults))
return resources
**** CubicPower OpenStack Study ****
def get_project_quotas(self, context, resources, project_id,
quota_class=None, defaults=True, usages=True):
self.called.append(('get_project_quotas', context, resources,
project_id, quota_class, defaults, usages))
return resources
**** CubicPower OpenStack Study ****
def limit_check(self, context, resources, values, project_id=None):
self.called.append(('limit_check', context, resources,
values, project_id))
**** CubicPower OpenStack Study ****
def reserve(self, context, resources, deltas, expire=None,
project_id=None):
self.called.append(('reserve', context, resources, deltas,
expire, project_id))
return self.reservations
**** CubicPower OpenStack Study ****
def commit(self, context, reservations, project_id=None):
self.called.append(('commit', context, reservations, project_id))
**** CubicPower OpenStack Study ****
def rollback(self, context, reservations, project_id=None):
self.called.append(('rollback', context, reservations, project_id))
**** CubicPower OpenStack Study ****
def destroy_all_by_project(self, context, project_id):
self.called.append(('destroy_all_by_project', context, project_id))
**** CubicPower OpenStack Study ****
def expire(self, context):
self.called.append(('expire', context))
**** CubicPower OpenStack Study ****
class BaseResourceTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def test_no_flag(self):
resource = quota.BaseResource('test_resource')
self.assertEqual(resource.name, 'test_resource')
self.assertIsNone(resource.flag)
self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
def test_with_flag(self):
# We know this flag exists, so use it...
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
self.assertEqual(resource.name, 'test_resource')
self.assertEqual(resource.flag, 'quota_volumes')
self.assertEqual(resource.default, 10)
**** CubicPower OpenStack Study ****
def test_with_flag_no_quota(self):
self.flags(quota_volumes=-1)
resource = quota.BaseResource('test_resource', 'quota_volumes')
self.assertEqual(resource.name, 'test_resource')
self.assertEqual(resource.flag, 'quota_volumes')
self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
def test_quota_no_project_no_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver()
context = FakeContext(None, None)
quota_value = resource.quota(driver, context)
self.assertEqual(quota_value, 10)
**** CubicPower OpenStack Study ****
def test_quota_with_project_no_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver(
by_project=dict(
test_project=dict(test_resource=15), ))
context = FakeContext('test_project', None)
quota_value = resource.quota(driver, context)
self.assertEqual(quota_value, 15)
**** CubicPower OpenStack Study ****
def test_quota_no_project_with_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver(
by_class=dict(
test_class=dict(test_resource=20), ))
context = FakeContext(None, 'test_class')
quota_value = resource.quota(driver, context)
self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
def test_quota_with_project_with_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver(by_project=dict(
test_project=dict(test_resource=15), ),
by_class=dict(test_class=dict(test_resource=20), ))
context = FakeContext('test_project', 'test_class')
quota_value = resource.quota(driver, context)
self.assertEqual(quota_value, 15)
**** CubicPower OpenStack Study ****
def test_quota_override_project_with_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver(by_project=dict(
test_project=dict(test_resource=15),
override_project=dict(test_resource=20), ))
context = FakeContext('test_project', 'test_class')
quota_value = resource.quota(driver, context,
project_id='override_project')
self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
def test_quota_with_project_override_class(self):
self.flags(quota_volumes=10)
resource = quota.BaseResource('test_resource', 'quota_volumes')
driver = FakeDriver(by_class=dict(
test_class=dict(test_resource=15),
override_class=dict(test_resource=20), ))
context = FakeContext('test_project', 'test_class')
quota_value = resource.quota(driver, context,
quota_class='override_class')
self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
class VolumeTypeResourceTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def test_name_and_flag(self):
volume_type_name = 'foo'
volume = {'name': volume_type_name, 'id': 'myid'}
resource = quota.VolumeTypeResource('volumes', volume)
self.assertEqual(resource.name, 'volumes_%s' % volume_type_name)
self.assertIsNone(resource.flag)
self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
class QuotaEngineTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def test_init(self):
quota_obj = quota.QuotaEngine()
self.assertEqual(quota_obj.resources, {})
self.assertIsInstance(quota_obj._driver, quota.DbQuotaDriver)
**** CubicPower OpenStack Study ****
def test_init_override_string(self):
quota_obj = quota.QuotaEngine(
quota_driver_class='cinder.tests.test_quota.FakeDriver')
self.assertEqual(quota_obj.resources, {})
self.assertIsInstance(quota_obj._driver, FakeDriver)
**** CubicPower OpenStack Study ****
def test_init_override_obj(self):
quota_obj = quota.QuotaEngine(quota_driver_class=FakeDriver)
self.assertEqual(quota_obj.resources, {})
self.assertEqual(quota_obj._driver, FakeDriver)
**** CubicPower OpenStack Study ****
def test_register_resource(self):
quota_obj = quota.QuotaEngine()
resource = quota.AbsoluteResource('test_resource')
quota_obj.register_resource(resource)
self.assertEqual(quota_obj.resources, dict(test_resource=resource))
**** CubicPower OpenStack Study ****
def test_register_resources(self):
quota_obj = quota.QuotaEngine()
resources = [
quota.AbsoluteResource('test_resource1'),
quota.AbsoluteResource('test_resource2'),
quota.AbsoluteResource('test_resource3'), ]
quota_obj.register_resources(resources)
self.assertEqual(quota_obj.resources,
dict(test_resource1=resources[0],
test_resource2=resources[1],
test_resource3=resources[2], ))
**** CubicPower OpenStack Study ****
def test_get_by_project(self):
context = FakeContext('test_project', 'test_class')
driver = FakeDriver(
by_project=dict(
test_project=dict(test_resource=42)))
quota_obj = quota.QuotaEngine(quota_driver_class=driver)
result = quota_obj.get_by_project(context, 'test_project',
'test_resource')
self.assertEqual(driver.called,
[('get_by_project',
context,
'test_project',
'test_resource'), ])
self.assertEqual(result, 42)
**** CubicPower OpenStack Study ****
def test_get_by_class(self):
context = FakeContext('test_project', 'test_class')
driver = FakeDriver(
by_class=dict(
test_class=dict(test_resource=42)))
quota_obj = quota.QuotaEngine(quota_driver_class=driver)
result = quota_obj.get_by_class(context, 'test_class', 'test_resource')
self.assertEqual(driver.called, [('get_by_class',
context,
'test_class',
'test_resource'), ])
self.assertEqual(result, 42)
**** CubicPower OpenStack Study ****
def _make_quota_obj(self, driver):
quota_obj = quota.QuotaEngine(quota_driver_class=driver)
resources = [
quota.AbsoluteResource('test_resource4'),
quota.AbsoluteResource('test_resource3'),
quota.AbsoluteResource('test_resource2'),
quota.AbsoluteResource('test_resource1'), ]
quota_obj.register_resources(resources)
return quota_obj
**** CubicPower OpenStack Study ****
def test_get_defaults(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
result = quota_obj.get_defaults(context)
self.assertEqual(driver.called, [('get_defaults',
context,
quota_obj.resources), ])
self.assertEqual(result, quota_obj.resources)
**** CubicPower OpenStack Study ****
def test_get_class_quotas(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
result1 = quota_obj.get_class_quotas(context, 'test_class')
result2 = quota_obj.get_class_quotas(context, 'test_class', False)
self.assertEqual(driver.called, [
('get_class_quotas',
context,
quota_obj.resources,
'test_class', True),
('get_class_quotas',
context, quota_obj.resources,
'test_class', False), ])
self.assertEqual(result1, quota_obj.resources)
self.assertEqual(result2, quota_obj.resources)
**** CubicPower OpenStack Study ****
def test_get_project_quotas(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
result1 = quota_obj.get_project_quotas(context, 'test_project')
result2 = quota_obj.get_project_quotas(context, 'test_project',
quota_class='test_class',
defaults=False,
usages=False)
self.assertEqual(driver.called, [
('get_project_quotas',
context,
quota_obj.resources,
'test_project',
None,
True,
True),
('get_project_quotas',
context,
quota_obj.resources,
'test_project',
'test_class',
False,
False), ])
self.assertEqual(result1, quota_obj.resources)
self.assertEqual(result2, quota_obj.resources)
**** CubicPower OpenStack Study ****
def test_count_no_resource(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
self.assertRaises(exception.QuotaResourceUnknown,
quota_obj.count, context, 'test_resource5',
True, foo='bar')
**** CubicPower OpenStack Study ****
def test_count_wrong_resource(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
self.assertRaises(exception.QuotaResourceUnknown,
quota_obj.count, context, 'test_resource1',
True, foo='bar')
**** CubicPower OpenStack Study ****
def test_count(self):
def fake_count(context, *args, **kwargs):
self.assertEqual(args, (True,))
self.assertEqual(kwargs, dict(foo='bar'))
return 5
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.register_resource(quota.CountableResource('test_resource5',
fake_count))
result = quota_obj.count(context, 'test_resource5', True, foo='bar')
self.assertEqual(result, 5)
**** CubicPower OpenStack Study ****
def fake_count(context, *args, **kwargs):
self.assertEqual(args, (True,))
self.assertEqual(kwargs, dict(foo='bar'))
return 5
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.register_resource(quota.CountableResource('test_resource5',
fake_count))
result = quota_obj.count(context, 'test_resource5', True, foo='bar')
self.assertEqual(result, 5)
**** CubicPower OpenStack Study ****
def test_limit_check(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.limit_check(context, test_resource1=4, test_resource2=3,
test_resource3=2, test_resource4=1)
self.assertEqual(driver.called, [
('limit_check',
context,
quota_obj.resources,
dict(
test_resource1=4,
test_resource2=3,
test_resource3=2,
test_resource4=1,),
None), ])
**** CubicPower OpenStack Study ****
def test_reserve(self):
context = FakeContext(None, None)
driver = FakeDriver(reservations=['resv-01',
'resv-02',
'resv-03',
'resv-04', ])
quota_obj = self._make_quota_obj(driver)
result1 = quota_obj.reserve(context, test_resource1=4,
test_resource2=3, test_resource3=2,
test_resource4=1)
result2 = quota_obj.reserve(context, expire=3600,
test_resource1=1, test_resource2=2,
test_resource3=3, test_resource4=4)
result3 = quota_obj.reserve(context, project_id='fake_project',
test_resource1=1, test_resource2=2,
test_resource3=3, test_resource4=4)
self.assertEqual(driver.called, [
('reserve',
context,
quota_obj.resources,
dict(
test_resource1=4,
test_resource2=3,
test_resource3=2,
test_resource4=1, ),
None,
None),
('reserve',
context,
quota_obj.resources,
dict(
test_resource1=1,
test_resource2=2,
test_resource3=3,
test_resource4=4, ),
3600,
None),
('reserve',
context,
quota_obj.resources,
dict(
test_resource1=1,
test_resource2=2,
test_resource3=3,
test_resource4=4, ),
None,
'fake_project'), ])
self.assertEqual(result1, ['resv-01',
'resv-02',
'resv-03',
'resv-04', ])
self.assertEqual(result2, ['resv-01',
'resv-02',
'resv-03',
'resv-04', ])
self.assertEqual(result3, ['resv-01',
'resv-02',
'resv-03',
'resv-04', ])
**** CubicPower OpenStack Study ****
def test_commit(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.commit(context, ['resv-01', 'resv-02', 'resv-03'])
self.assertEqual(driver.called,
[('commit',
context,
['resv-01',
'resv-02',
'resv-03'],
None), ])
**** CubicPower OpenStack Study ****
def test_rollback(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.rollback(context, ['resv-01', 'resv-02', 'resv-03'])
self.assertEqual(driver.called,
[('rollback',
context,
['resv-01',
'resv-02',
'resv-03'],
None), ])
**** CubicPower OpenStack Study ****
def test_destroy_all_by_project(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.destroy_all_by_project(context, 'test_project')
self.assertEqual(driver.called,
[('destroy_all_by_project',
context,
'test_project'), ])
**** CubicPower OpenStack Study ****
def test_expire(self):
context = FakeContext(None, None)
driver = FakeDriver()
quota_obj = self._make_quota_obj(driver)
quota_obj.expire(context)
self.assertEqual(driver.called, [('expire', context), ])
**** CubicPower OpenStack Study ****
def test_resource_names(self):
quota_obj = self._make_quota_obj(None)
self.assertEqual(quota_obj.resource_names,
['test_resource1', 'test_resource2',
'test_resource3', 'test_resource4'])
**** CubicPower OpenStack Study ****
class VolumeTypeQuotaEngineTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def test_default_resources(self):
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'snapshots', 'volumes'])
**** CubicPower OpenStack Study ****
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'snapshots', 'volumes'])
**** CubicPower OpenStack Study ****
def test_volume_type_resources(self):
ctx = context.RequestContext('admin', 'admin', is_admin=True)
vtype = db.volume_type_create(ctx, {'name': 'type1'})
vtype2 = db.volume_type_create(ctx, {'name': 'type_2'})
def fake_vtga(context, inactive=False, filters=None):
return {
'type1': {
'id': vtype['id'],
'name': 'type1',
'extra_specs': {},
},
'type_2': {
'id': vtype['id'],
'name': 'type_2',
'extra_specs': {},
},
}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'gigabytes_type1', 'gigabytes_type_2',
'snapshots', 'snapshots_type1', 'snapshots_type_2',
'volumes', 'volumes_type1', 'volumes_type_2'])
db.volume_type_destroy(ctx, vtype['id'])
db.volume_type_destroy(ctx, vtype2['id'])
**** CubicPower OpenStack Study ****
def fake_vtga(context, inactive=False, filters=None):
return {
'type1': {
'id': vtype['id'],
'name': 'type1',
'extra_specs': {},
},
'type_2': {
'id': vtype['id'],
'name': 'type_2',
'extra_specs': {},
},
}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
engine = quota.VolumeTypeQuotaEngine()
self.assertEqual(engine.resource_names,
['gigabytes', 'gigabytes_type1', 'gigabytes_type_2',
'snapshots', 'snapshots_type1', 'snapshots_type_2',
'volumes', 'volumes_type1', 'volumes_type_2'])
db.volume_type_destroy(ctx, vtype['id'])
db.volume_type_destroy(ctx, vtype2['id'])
**** CubicPower OpenStack Study ****
class DbQuotaDriverTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(DbQuotaDriverTestCase, self).setUp()
self.flags(quota_volumes=10,
quota_snapshots=10,
quota_gigabytes=1000,
reservation_expire=86400,
until_refresh=0,
max_age=0,
)
self.driver = quota.DbQuotaDriver()
self.calls = []
patcher = mock.patch.object(timeutils, 'utcnow')
self.addCleanup(patcher.stop)
self.mock_utcnow = patcher.start()
self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
def test_get_defaults(self):
# Use our pre-defined resources
self._stub_quota_class_get_default()
self._stub_volume_type_get_all()
result = self.driver.get_defaults(None, quota.QUOTAS.resources)
self.assertEqual(
result,
dict(
volumes=10,
snapshots=10,
gigabytes=1000, ))
**** CubicPower OpenStack Study ****
def _stub_quota_class_get_default(self):
# Stub out quota_class_get_default
def fake_qcgd(context):
self.calls.append('quota_class_get_default')
return dict(volumes=10,
snapshots=10,
gigabytes=1000,)
self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
**** CubicPower OpenStack Study ****
def fake_qcgd(context):
self.calls.append('quota_class_get_default')
return dict(volumes=10,
snapshots=10,
gigabytes=1000,)
self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
**** CubicPower OpenStack Study ****
def _stub_volume_type_get_all(self):
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
**** CubicPower OpenStack Study ****
def fake_vtga(context, inactive=False, filters=None):
return {}
self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
**** CubicPower OpenStack Study ****
def _stub_quota_class_get_all_by_name(self):
# Stub out quota_class_get_all_by_name
def fake_qcgabn(context, quota_class):
self.calls.append('quota_class_get_all_by_name')
self.assertEqual(quota_class, 'test_class')
return dict(gigabytes=500, volumes=10, snapshots=10, )
self.stubs.Set(db, 'quota_class_get_all_by_name', fake_qcgabn)
**** CubicPower OpenStack Study ****
def fake_qcgabn(context, quota_class):
self.calls.append('quota_class_get_all_by_name')
self.assertEqual(quota_class, 'test_class')
return dict(gigabytes=500, volumes=10, snapshots=10, )
self.stubs.Set(db, 'quota_class_get_all_by_name', fake_qcgabn)
**** CubicPower OpenStack Study ****
def test_get_class_quotas(self):
self._stub_quota_class_get_all_by_name()
self._stub_volume_type_get_all()
result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
'test_class')
self.assertEqual(self.calls, ['quota_class_get_all_by_name'])
self.assertEqual(result, dict(volumes=10,
gigabytes=500,
snapshots=10))
**** CubicPower OpenStack Study ****
def test_get_class_quotas_no_defaults(self):
self._stub_quota_class_get_all_by_name()
result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
'test_class', False)
self.assertEqual(self.calls, ['quota_class_get_all_by_name'])
self.assertEqual(result, dict(volumes=10,
gigabytes=500,
snapshots=10))
**** CubicPower OpenStack Study ****
def _stub_get_by_project(self):
def fake_qgabp(context, project_id):
self.calls.append('quota_get_all_by_project')
self.assertEqual(project_id, 'test_project')
return dict(volumes=10, gigabytes=50, reserved=0, snapshots=10)
def fake_qugabp(context, project_id):
self.calls.append('quota_usage_get_all_by_project')
self.assertEqual(project_id, 'test_project')
return dict(volumes=dict(in_use=2, reserved=0),
snapshots=dict(in_use=2, reserved=0),
gigabytes=dict(in_use=10, reserved=0), )
self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
self._stub_quota_class_get_all_by_name()
self._stub_quota_class_get_default()
**** CubicPower OpenStack Study ****
def fake_qgabp(context, project_id):
self.calls.append('quota_get_all_by_project')
self.assertEqual(project_id, 'test_project')
return dict(volumes=10, gigabytes=50, reserved=0, snapshots=10)
**** CubicPower OpenStack Study ****
def fake_qugabp(context, project_id):
self.calls.append('quota_usage_get_all_by_project')
self.assertEqual(project_id, 'test_project')
return dict(volumes=dict(in_use=2, reserved=0),
snapshots=dict(in_use=2, reserved=0),
gigabytes=dict(in_use=10, reserved=0), )
self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
self._stub_quota_class_get_all_by_name()
self._stub_quota_class_get_default()
**** CubicPower OpenStack Study ****
def test_get_project_quotas(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project')
self.assertEqual(self.calls, ['quota_get_all_by_project',
'quota_usage_get_all_by_project',
'quota_class_get_all_by_name',
'quota_class_get_default', ])
self.assertEqual(result, dict(volumes=dict(limit=10,
in_use=2,
reserved=0, ),
snapshots=dict(limit=10,
in_use=2,
reserved=0, ),
gigabytes=dict(limit=50,
in_use=10,
reserved=0, ), ))
**** CubicPower OpenStack Study ****
def test_get_project_quotas_alt_context_no_class(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project')
self.assertEqual(self.calls, ['quota_get_all_by_project',
'quota_usage_get_all_by_project',
'quota_class_get_default', ])
self.assertEqual(result, dict(volumes=dict(limit=10,
in_use=2,
reserved=0, ),
snapshots=dict(limit=10,
in_use=2,
reserved=0, ),
gigabytes=dict(limit=50,
in_use=10,
reserved=0, ), ))
**** CubicPower OpenStack Study ****
def test_get_project_quotas_alt_context_with_class(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('other_project', 'other_class'),
quota.QUOTAS.resources, 'test_project', quota_class='test_class')
self.assertEqual(self.calls, ['quota_get_all_by_project',
'quota_usage_get_all_by_project',
'quota_class_get_all_by_name',
'quota_class_get_default', ])
self.assertEqual(result, dict(volumes=dict(limit=10,
in_use=2,
reserved=0, ),
snapshots=dict(limit=10,
in_use=2,
reserved=0, ),
gigabytes=dict(limit=50,
in_use=10,
reserved=0, ), ))
**** CubicPower OpenStack Study ****
def test_get_project_quotas_no_defaults(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', defaults=False)
self.assertEqual(self.calls, ['quota_get_all_by_project',
'quota_usage_get_all_by_project',
'quota_class_get_all_by_name',
'quota_class_get_default', ])
self.assertEqual(result,
dict(gigabytes=dict(limit=50,
in_use=10,
reserved=0, ),
snapshots=dict(limit=10,
in_use=2,
reserved=0, ),
volumes=dict(limit=10,
in_use=2,
reserved=0, ), ))
**** CubicPower OpenStack Study ****
def test_get_project_quotas_no_usages(self):
self._stub_get_by_project()
self._stub_volume_type_get_all()
result = self.driver.get_project_quotas(
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources, 'test_project', usages=False)
self.assertEqual(self.calls, ['quota_get_all_by_project',
'quota_class_get_all_by_name',
'quota_class_get_default', ])
self.assertEqual(result, dict(volumes=dict(limit=10, ),
snapshots=dict(limit=10, ),
gigabytes=dict(limit=50, ), ))
**** CubicPower OpenStack Study ****
def _stub_get_project_quotas(self):
def fake_get_project_quotas(context, resources, project_id,
quota_class=None, defaults=True,
usages=True):
self.calls.append('get_project_quotas')
return dict((k, dict(limit=v.default))
for k, v in resources.items())
self.stubs.Set(self.driver, 'get_project_quotas',
fake_get_project_quotas)
**** CubicPower OpenStack Study ****
def fake_get_project_quotas(context, resources, project_id,
quota_class=None, defaults=True,
usages=True):
self.calls.append('get_project_quotas')
return dict((k, dict(limit=v.default))
for k, v in resources.items())
self.stubs.Set(self.driver, 'get_project_quotas',
fake_get_project_quotas)
**** CubicPower OpenStack Study ****
def test_get_quotas_has_sync_unknown(self):
self._stub_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
['unknown'], True)
self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
def test_get_quotas_no_sync_unknown(self):
self._stub_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
['unknown'], False)
self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
def test_get_quotas_has_sync_no_sync_resource(self):
self._stub_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
['metadata_items'], True)
self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
def test_get_quotas_no_sync_has_sync_resource(self):
self._stub_get_project_quotas()
self.assertRaises(exception.QuotaResourceUnknown,
self.driver._get_quotas,
None, quota.QUOTAS.resources,
['volumes'], False)
self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
def test_get_quotas_has_sync(self):
self._stub_get_project_quotas()
result = self.driver._get_quotas(FakeContext('test_project',
'test_class'),
quota.QUOTAS.resources,
['volumes', 'gigabytes'],
True)
self.assertEqual(self.calls, ['get_project_quotas'])
self.assertEqual(result, dict(volumes=10, gigabytes=1000, ))
**** CubicPower OpenStack Study ****
def _stub_quota_reserve(self):
def fake_quota_reserve(context, resources, quotas, deltas, expire,
until_refresh, max_age, project_id=None):
self.calls.append(('quota_reserve', expire, until_refresh,
max_age))
return ['resv-1', 'resv-2', 'resv-3']
self.stubs.Set(db, 'quota_reserve', fake_quota_reserve)
**** CubicPower OpenStack Study ****
def fake_quota_reserve(context, resources, quotas, deltas, expire,
until_refresh, max_age, project_id=None):
self.calls.append(('quota_reserve', expire, until_refresh,
max_age))
return ['resv-1', 'resv-2', 'resv-3']
self.stubs.Set(db, 'quota_reserve', fake_quota_reserve)
**** CubicPower OpenStack Study ****
def test_reserve_bad_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self.assertRaises(exception.InvalidReservationExpiration,
self.driver.reserve,
FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire='invalid')
self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
def test_reserve_default_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2))
expire = timeutils.utcnow() + datetime.timedelta(seconds=86400)
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 0, 0), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def test_reserve_int_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=3600)
expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 0, 0), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def test_reserve_timedelta_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
expire_delta = datetime.timedelta(seconds=60)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=expire_delta)
expire = timeutils.utcnow() + expire_delta
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 0, 0), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def test_reserve_datetime_expire(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=expire)
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 0, 0), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def test_reserve_until_refresh(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self.flags(until_refresh=500)
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=expire)
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 500, 0), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def test_reserve_max_age(self):
self._stub_get_project_quotas()
self._stub_quota_reserve()
self.flags(max_age=86400)
expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
result = self.driver.reserve(FakeContext('test_project', 'test_class'),
quota.QUOTAS.resources,
dict(volumes=2), expire=expire)
self.assertEqual(self.calls, ['get_project_quotas',
('quota_reserve', expire, 0, 86400), ])
self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
def _stub_quota_destroy_all_by_project(self):
def fake_quota_destroy_all_by_project(context, project_id):
self.calls.append(('quota_destroy_all_by_project', project_id))
return None
self.stubs.Set(sqa_api, 'quota_destroy_all_by_project',
fake_quota_destroy_all_by_project)
**** CubicPower OpenStack Study ****
def fake_quota_destroy_all_by_project(context, project_id):
self.calls.append(('quota_destroy_all_by_project', project_id))
return None
self.stubs.Set(sqa_api, 'quota_destroy_all_by_project',
fake_quota_destroy_all_by_project)
**** CubicPower OpenStack Study ****
def test_destroy_by_project(self):
self._stub_quota_destroy_all_by_project()
self.driver.destroy_all_by_project(FakeContext('test_project',
'test_class'),
'test_project')
self.assertEqual(self.calls, [('quota_destroy_all_by_project',
('test_project')), ])
**** CubicPower OpenStack Study ****
class FakeSession(object):
**** CubicPower OpenStack Study ****
def begin(self):
return self
**** CubicPower OpenStack Study ****
def __enter__(self):
return self
**** CubicPower OpenStack Study ****
def __exit__(self, exc_type, exc_value, exc_traceback):
return False
**** CubicPower OpenStack Study ****
class FakeUsage(sqa_models.QuotaUsage):
**** CubicPower OpenStack Study ****
def save(self, *args, **kwargs):
pass
**** CubicPower OpenStack Study ****
class QuotaReserveSqlAlchemyTestCase(test.TestCase):
# cinder.db.sqlalchemy.api.quota_reserve is so complex it needs its
# own test case, and since it's a quota manipulator, this is the
# best place to put it...
**** CubicPower OpenStack Study ****
def setUp(self):
super(QuotaReserveSqlAlchemyTestCase, self).setUp()
self.sync_called = set()
def make_sync(res_name):
def fake_sync(context, project_id, volume_type_id=None,
volume_type_name=None, session=None):
self.sync_called.add(res_name)
if res_name in self.usages:
if self.usages[res_name].in_use < 0:
return {res_name: 2}
else:
return {res_name: self.usages[res_name].in_use - 1}
return {res_name: 0}
return fake_sync
self.resources = {}
QUOTA_SYNC_FUNCTIONS = {}
for res_name in ('volumes', 'gigabytes'):
res = quota.ReservableResource(res_name, '_sync_%s' % res_name)
QUOTA_SYNC_FUNCTIONS['_sync_%s' % res_name] = make_sync(res_name)
self.resources[res_name] = res
self.stubs.Set(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
self.expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
self.usages = {}
self.usages_created = {}
self.reservations_created = {}
def fake_get_session():
return FakeSession()
def fake_get_quota_usages(context, session, project_id):
return self.usages.copy()
def fake_quota_usage_create(context, project_id, resource, in_use,
reserved, until_refresh, session=None,
save=True):
quota_usage_ref = self._make_quota_usage(
project_id, resource, in_use, reserved, until_refresh,
timeutils.utcnow(), timeutils.utcnow())
self.usages_created[resource] = quota_usage_ref
return quota_usage_ref
def fake_reservation_create(context, uuid, usage_id, project_id,
resource, delta, expire, session=None):
reservation_ref = self._make_reservation(
uuid, usage_id, project_id, resource, delta, expire,
timeutils.utcnow(), timeutils.utcnow())
self.reservations_created[resource] = reservation_ref
return reservation_ref
self.stubs.Set(sqa_api, 'get_session', fake_get_session)
self.stubs.Set(sqa_api, '_get_quota_usages', fake_get_quota_usages)
self.stubs.Set(sqa_api, '_quota_usage_create', fake_quota_usage_create)
self.stubs.Set(sqa_api, '_reservation_create', fake_reservation_create)
patcher = mock.patch.object(timeutils, 'utcnow')
self.addCleanup(patcher.stop)
self.mock_utcnow = patcher.start()
self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
def make_sync(res_name):
def fake_sync(context, project_id, volume_type_id=None,
volume_type_name=None, session=None):
self.sync_called.add(res_name)
if res_name in self.usages:
if self.usages[res_name].in_use < 0:
return {res_name: 2}
else:
return {res_name: self.usages[res_name].in_use - 1}
return {res_name: 0}
return fake_sync
self.resources = {}
QUOTA_SYNC_FUNCTIONS = {}
for res_name in ('volumes', 'gigabytes'):
res = quota.ReservableResource(res_name, '_sync_%s' % res_name)
QUOTA_SYNC_FUNCTIONS['_sync_%s' % res_name] = make_sync(res_name)
self.resources[res_name] = res
self.stubs.Set(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
self.expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
self.usages = {}
self.usages_created = {}
self.reservations_created = {}
**** CubicPower OpenStack Study ****
def fake_get_session():
return FakeSession()
**** CubicPower OpenStack Study ****
def fake_get_quota_usages(context, session, project_id):
return self.usages.copy()
**** CubicPower OpenStack Study ****
def fake_quota_usage_create(context, project_id, resource, in_use,
reserved, until_refresh, session=None,
save=True):
quota_usage_ref = self._make_quota_usage(
project_id, resource, in_use, reserved, until_refresh,
timeutils.utcnow(), timeutils.utcnow())
self.usages_created[resource] = quota_usage_ref
return quota_usage_ref
**** CubicPower OpenStack Study ****
def fake_reservation_create(context, uuid, usage_id, project_id,
resource, delta, expire, session=None):
reservation_ref = self._make_reservation(
uuid, usage_id, project_id, resource, delta, expire,
timeutils.utcnow(), timeutils.utcnow())
self.reservations_created[resource] = reservation_ref
return reservation_ref
self.stubs.Set(sqa_api, 'get_session', fake_get_session)
self.stubs.Set(sqa_api, '_get_quota_usages', fake_get_quota_usages)
self.stubs.Set(sqa_api, '_quota_usage_create', fake_quota_usage_create)
self.stubs.Set(sqa_api, '_reservation_create', fake_reservation_create)
patcher = mock.patch.object(timeutils, 'utcnow')
self.addCleanup(patcher.stop)
self.mock_utcnow = patcher.start()
self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
def _make_quota_usage(self, project_id, resource, in_use, reserved,
until_refresh, created_at, updated_at):
quota_usage_ref = FakeUsage()
quota_usage_ref.id = len(self.usages) + len(self.usages_created)
quota_usage_ref.project_id = project_id
quota_usage_ref.resource = resource
quota_usage_ref.in_use = in_use
quota_usage_ref.reserved = reserved
quota_usage_ref.until_refresh = until_refresh
quota_usage_ref.created_at = created_at
quota_usage_ref.updated_at = updated_at
quota_usage_ref.deleted_at = None
quota_usage_ref.deleted = False
return quota_usage_ref
**** CubicPower OpenStack Study ****
def init_usage(self, project_id, resource, in_use, reserved,
until_refresh=None, created_at=None, updated_at=None):
if created_at is None:
created_at = timeutils.utcnow()
if updated_at is None:
updated_at = timeutils.utcnow()
quota_usage_ref = self._make_quota_usage(project_id, resource, in_use,
reserved, until_refresh,
created_at, updated_at)
self.usages[resource] = quota_usage_ref
**** CubicPower OpenStack Study ****
def compare_usage(self, usage_dict, expected):
for usage in expected:
resource = usage['resource']
for key, value in usage.items():
actual = getattr(usage_dict[resource], key)
self.assertEqual(actual, value,
"%s != %s on usage for resource %s" %
(actual, value, resource))
**** CubicPower OpenStack Study ****
def _make_reservation(self, uuid, usage_id, project_id, resource,
delta, expire, created_at, updated_at):
reservation_ref = sqa_models.Reservation()
reservation_ref.id = len(self.reservations_created)
reservation_ref.uuid = uuid
reservation_ref.usage_id = usage_id
reservation_ref.project_id = project_id
reservation_ref.resource = resource
reservation_ref.delta = delta
reservation_ref.expire = expire
reservation_ref.created_at = created_at
reservation_ref.updated_at = updated_at
reservation_ref.deleted_at = None
reservation_ref.deleted = False
return reservation_ref
**** CubicPower OpenStack Study ****
def compare_reservation(self, reservations, expected):
reservations = set(reservations)
for resv in expected:
resource = resv['resource']
resv_obj = self.reservations_created[resource]
self.assertIn(resv_obj.uuid, reservations)
reservations.discard(resv_obj.uuid)
for key, value in resv.items():
actual = getattr(resv_obj, key)
self.assertEqual(actual, value,
"%s != %s on reservation for resource %s" %
(actual, value, resource))
self.assertEqual(len(reservations), 0)
**** CubicPower OpenStack Study ****
def test_quota_reserve_create_usages(self):
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5,
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
self.compare_usage(self.usages_created,
[dict(resource='volumes',
project_id='test_project',
in_use=0,
reserved=2,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=0,
reserved=2 * 1024,
until_refresh=None), ])
self.compare_reservation(
result,
[dict(resource='volumes',
usage_id=self.usages_created['volumes'],
project_id='test_project',
delta=2),
dict(resource='gigabytes',
usage_id=self.usages_created['gigabytes'],
delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_negative_in_use(self):
self.init_usage('test_project', 'volumes', -1, 0, until_refresh=1)
self.init_usage('test_project', 'gigabytes', -1, 0, until_refresh=1)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5,
gigabytes=10 * 1024, )
deltas = dict(volumes=2,
gigabytes=2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=2,
reserved=2,
until_refresh=5),
dict(resource='gigabytes',
project_id='test_project',
in_use=2,
reserved=2 * 1024,
until_refresh=5), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_until_refresh(self):
self.init_usage('test_project', 'volumes', 3, 0, until_refresh=1)
self.init_usage('test_project', 'gigabytes', 3, 0, until_refresh=1)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 5, 0)
self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=2,
reserved=2,
until_refresh=5),
dict(resource='gigabytes',
project_id='test_project',
in_use=2,
reserved=2 * 1024,
until_refresh=5), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_max_age(self):
max_age = 3600
record_created = (timeutils.utcnow() -
datetime.timedelta(seconds=max_age))
self.init_usage('test_project', 'volumes', 3, 0,
created_at=record_created, updated_at=record_created)
self.init_usage('test_project', 'gigabytes', 3, 0,
created_at=record_created, updated_at=record_created)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, max_age)
self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=2,
reserved=2,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=2,
reserved=2 * 1024,
until_refresh=None), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_no_refresh(self):
self.init_usage('test_project', 'volumes', 3, 0)
self.init_usage('test_project', 'gigabytes', 3, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
self.assertEqual(self.sync_called, set([]))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=3,
reserved=2,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=3,
reserved=2 * 1024,
until_refresh=None), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_unders(self):
self.init_usage('test_project', 'volumes', 1, 0)
self.init_usage('test_project', 'gigabytes', 1 * 1024, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
self.assertEqual(self.sync_called, set([]))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=1,
reserved=0,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=1 * 1024,
reserved=0,
until_refresh=None), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=-2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
delta=-2 * 1024), ])
**** CubicPower OpenStack Study ****
def test_quota_reserve_overs(self):
self.init_usage('test_project', 'volumes', 4, 0)
self.init_usage('test_project', 'gigabytes', 10 * 1024, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=2, gigabytes=2 * 1024, )
self.assertRaises(exception.OverQuota,
sqa_api.quota_reserve,
context, self.resources, quotas,
deltas, self.expire, 0, 0)
self.assertEqual(self.sync_called, set([]))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=4,
reserved=0,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=10 * 1024,
reserved=0,
until_refresh=None), ])
self.assertEqual(self.usages_created, {})
self.assertEqual(self.reservations_created, {})
**** CubicPower OpenStack Study ****
def test_quota_reserve_reduction(self):
self.init_usage('test_project', 'volumes', 10, 0)
self.init_usage('test_project', 'gigabytes', 20 * 1024, 0)
context = FakeContext('test_project', 'test_class')
quotas = dict(volumes=5, gigabytes=10 * 1024, )
deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
result = sqa_api.quota_reserve(context, self.resources, quotas,
deltas, self.expire, 0, 0)
self.assertEqual(self.sync_called, set([]))
self.compare_usage(self.usages, [dict(resource='volumes',
project_id='test_project',
in_use=10,
reserved=0,
until_refresh=None),
dict(resource='gigabytes',
project_id='test_project',
in_use=20 * 1024,
reserved=0,
until_refresh=None), ])
self.assertEqual(self.usages_created, {})
self.compare_reservation(result,
[dict(resource='volumes',
usage_id=self.usages['volumes'],
project_id='test_project',
delta=-2),
dict(resource='gigabytes',
usage_id=self.usages['gigabytes'],
project_id='test_project',
delta=-2 * 1024), ])