**** CubicPower OpenStack Study ****
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import datetime
import mock
from oslo.config import cfg
from cinder import context
from cinder import db
from cinder.db.sqlalchemy import api as sqa_api
from cinder.db.sqlalchemy import models as sqa_models
from cinder import exception
from cinder.openstack.common import timeutils
from cinder import quota
from cinder import test
import cinder.tests.image.fake
from cinder import volume
CONF = cfg.CONF
**** CubicPower OpenStack Study ****
class QuotaIntegrationTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(QuotaIntegrationTestCase, self).setUp()
        self.volume_type_name = CONF.default_volume_type
        self.volume_type = db.volume_type_create(
            context.get_admin_context(),
            dict(name=self.volume_type_name))
        self.flags(quota_volumes=2,
                   quota_snapshots=2,
                   quota_gigabytes=20)
        self.user_id = 'admin'
        self.project_id = 'admin'
        self.context = context.RequestContext(self.user_id,
                                              self.project_id,
                                              is_admin=True)
        # Destroy the 'default' quota_class in the database to avoid
        # conflicts with the test cases here that are setting up their own
        # defaults.
        db.quota_class_destroy_all_by_name(self.context, 'default')
**** CubicPower OpenStack Study ****
    def tearDown(self):
        db.volume_type_destroy(context.get_admin_context(),
                               self.volume_type['id'])
        super(QuotaIntegrationTestCase, self).tearDown()
        cinder.tests.image.fake.FakeImageService_reset()
**** CubicPower OpenStack Study ****
    def _create_volume(self, size=1):
        """Create a test volume."""
        vol = {}
        vol['user_id'] = self.user_id
        vol['project_id'] = self.project_id
        vol['size'] = size
        vol['status'] = 'available'
        vol['volume_type_id'] = self.volume_type['id']
        return db.volume_create(self.context, vol)
**** CubicPower OpenStack Study ****
    def _create_snapshot(self, volume):
        snapshot = {}
        snapshot['user_id'] = self.user_id
        snapshot['project_id'] = self.project_id
        snapshot['volume_id'] = volume['id']
        snapshot['volume_size'] = volume['size']
        snapshot['status'] = 'available'
        return db.snapshot_create(self.context, snapshot)
**** CubicPower OpenStack Study ****
    def test_too_many_volumes(self):
        volume_ids = []
        for i in range(CONF.quota_volumes):
            vol_ref = self._create_volume()
            volume_ids.append(vol_ref['id'])
        self.assertRaises(exception.VolumeLimitExceeded,
                          volume.API().create,
                          self.context, 1, '', '',
                          volume_type=self.volume_type)
        for volume_id in volume_ids:
            db.volume_destroy(self.context, volume_id)
**** CubicPower OpenStack Study ****
    def test_too_many_volumes_of_type(self):
        resource = 'volumes_%s' % self.volume_type_name
        db.quota_class_create(self.context, 'default', resource, 1)
        flag_args = {
            'quota_volumes': 2000,
            'quota_gigabytes': 2000
        }
        self.flags(**flag_args)
        vol_ref = self._create_volume()
        self.assertRaises(exception.VolumeLimitExceeded,
                          volume.API().create,
                          self.context, 1, '', '',
                          volume_type=self.volume_type)
        db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
    def test_too_many_snapshots_of_type(self):
        resource = 'snapshots_%s' % self.volume_type_name
        db.quota_class_create(self.context, 'default', resource, 1)
        flag_args = {
            'quota_volumes': 2000,
            'quota_gigabytes': 2000,
        }
        self.flags(**flag_args)
        vol_ref = self._create_volume()
        snap_ref = self._create_snapshot(vol_ref)
        self.assertRaises(exception.SnapshotLimitExceeded,
                          volume.API().create_snapshot,
                          self.context, vol_ref, '', '')
        db.snapshot_destroy(self.context, snap_ref['id'])
        db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
    def test_too_many_gigabytes(self):
        volume_ids = []
        vol_ref = self._create_volume(size=20)
        volume_ids.append(vol_ref['id'])
        self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
                          volume.API().create,
                          self.context, 1, '', '',
                          volume_type=self.volume_type)
        for volume_id in volume_ids:
            db.volume_destroy(self.context, volume_id)
**** CubicPower OpenStack Study ****
    def test_too_many_combined_gigabytes(self):
        vol_ref = self._create_volume(size=10)
        snap_ref = self._create_snapshot(vol_ref)
        self.assertRaises(exception.QuotaError,
                          volume.API().create_snapshot,
                          self.context, vol_ref, '', '')
        usages = db.quota_usage_get_all_by_project(self.context,
                                                   self.project_id)
        self.assertEqual(usages['gigabytes']['in_use'], 20)
        db.snapshot_destroy(self.context, snap_ref['id'])
        db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
    def test_no_snapshot_gb_quota_flag(self):
        self.flags(quota_volumes=2,
                   quota_snapshots=2,
                   quota_gigabytes=20,
                   no_snapshot_gb_quota=True)
        vol_ref = self._create_volume(size=10)
        snap_ref = self._create_snapshot(vol_ref)
        snap_ref2 = volume.API().create_snapshot(self.context,
                                                 vol_ref, '', '')
        # Make sure no reservation was created for snapshot gigabytes.
        reservations = db.reservation_get_all_by_project(self.context,
                                                         self.project_id)
        self.assertIsNone(reservations.get('gigabytes'))
        # Make sure the snapshot volume_size isn't included in usage.
        vol_ref2 = volume.API().create(self.context, 10, '', '')
        usages = db.quota_usage_get_all_by_project(self.context,
                                                   self.project_id)
        self.assertEqual(usages['gigabytes']['in_use'], 20)
        db.snapshot_destroy(self.context, snap_ref['id'])
        db.snapshot_destroy(self.context, snap_ref2['id'])
        db.volume_destroy(self.context, vol_ref['id'])
        db.volume_destroy(self.context, vol_ref2['id'])
**** CubicPower OpenStack Study ****
    def test_too_many_gigabytes_of_type(self):
        resource = 'gigabytes_%s' % self.volume_type_name
        db.quota_class_create(self.context, 'default', resource, 10)
        flag_args = {
            'quota_volumes': 2000,
            'quota_gigabytes': 2000,
        }
        self.flags(**flag_args)
        vol_ref = self._create_volume(size=10)
        self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
                          volume.API().create,
                          self.context, 1, '', '',
                          volume_type=self.volume_type)
        db.volume_destroy(self.context, vol_ref['id'])
**** CubicPower OpenStack Study ****
class FakeContext(object):
    
**** CubicPower OpenStack Study ****
    def __init__(self, project_id, quota_class):
        self.is_admin = False
        self.user_id = 'fake_user'
        self.project_id = project_id
        self.quota_class = quota_class
**** CubicPower OpenStack Study ****
    def elevated(self):
        elevated = self.__class__(self.project_id, self.quota_class)
        elevated.is_admin = True
        return elevated
**** CubicPower OpenStack Study ****
class FakeDriver(object):
    
**** CubicPower OpenStack Study ****
    def __init__(self, by_project=None, by_class=None, reservations=None):
        self.called = []
        self.by_project = by_project or {}
        self.by_class = by_class or {}
        self.reservations = reservations or []
**** CubicPower OpenStack Study ****
    def get_by_project(self, context, project_id, resource):
        self.called.append(('get_by_project', context, project_id, resource))
        try:
            return self.by_project[project_id][resource]
        except KeyError:
            raise exception.ProjectQuotaNotFound(project_id=project_id)
**** CubicPower OpenStack Study ****
    def get_by_class(self, context, quota_class, resource):
        self.called.append(('get_by_class', context, quota_class, resource))
        try:
            return self.by_class[quota_class][resource]
        except KeyError:
            raise exception.QuotaClassNotFound(class_name=quota_class)
**** CubicPower OpenStack Study ****
    def get_default(self, context, resource):
        self.called.append(('get_default', context, resource))
        return resource.default
**** CubicPower OpenStack Study ****
    def get_defaults(self, context, resources):
        self.called.append(('get_defaults', context, resources))
        return resources
**** CubicPower OpenStack Study ****
    def get_class_quotas(self, context, resources, quota_class,
                         defaults=True):
        self.called.append(('get_class_quotas', context, resources,
                            quota_class, defaults))
        return resources
**** CubicPower OpenStack Study ****
    def get_project_quotas(self, context, resources, project_id,
                           quota_class=None, defaults=True, usages=True):
        self.called.append(('get_project_quotas', context, resources,
                            project_id, quota_class, defaults, usages))
        return resources
**** CubicPower OpenStack Study ****
    def limit_check(self, context, resources, values, project_id=None):
        self.called.append(('limit_check', context, resources,
                            values, project_id))
**** CubicPower OpenStack Study ****
    def reserve(self, context, resources, deltas, expire=None,
                project_id=None):
        self.called.append(('reserve', context, resources, deltas,
                            expire, project_id))
        return self.reservations
**** CubicPower OpenStack Study ****
    def commit(self, context, reservations, project_id=None):
        self.called.append(('commit', context, reservations, project_id))
**** CubicPower OpenStack Study ****
    def rollback(self, context, reservations, project_id=None):
        self.called.append(('rollback', context, reservations, project_id))
**** CubicPower OpenStack Study ****
    def destroy_all_by_project(self, context, project_id):
        self.called.append(('destroy_all_by_project', context, project_id))
**** CubicPower OpenStack Study ****
    def expire(self, context):
        self.called.append(('expire', context))
**** CubicPower OpenStack Study ****
class BaseResourceTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def test_no_flag(self):
        resource = quota.BaseResource('test_resource')
        self.assertEqual(resource.name, 'test_resource')
        self.assertIsNone(resource.flag)
        self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
    def test_with_flag(self):
        # We know this flag exists, so use it...
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        self.assertEqual(resource.name, 'test_resource')
        self.assertEqual(resource.flag, 'quota_volumes')
        self.assertEqual(resource.default, 10)
**** CubicPower OpenStack Study ****
    def test_with_flag_no_quota(self):
        self.flags(quota_volumes=-1)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        self.assertEqual(resource.name, 'test_resource')
        self.assertEqual(resource.flag, 'quota_volumes')
        self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
    def test_quota_no_project_no_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver()
        context = FakeContext(None, None)
        quota_value = resource.quota(driver, context)
        self.assertEqual(quota_value, 10)
**** CubicPower OpenStack Study ****
    def test_quota_with_project_no_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver(
            by_project=dict(
                test_project=dict(test_resource=15), ))
        context = FakeContext('test_project', None)
        quota_value = resource.quota(driver, context)
        self.assertEqual(quota_value, 15)
**** CubicPower OpenStack Study ****
    def test_quota_no_project_with_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver(
            by_class=dict(
                test_class=dict(test_resource=20), ))
        context = FakeContext(None, 'test_class')
        quota_value = resource.quota(driver, context)
        self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
    def test_quota_with_project_with_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver(by_project=dict(
            test_project=dict(test_resource=15), ),
            by_class=dict(test_class=dict(test_resource=20), ))
        context = FakeContext('test_project', 'test_class')
        quota_value = resource.quota(driver, context)
        self.assertEqual(quota_value, 15)
**** CubicPower OpenStack Study ****
    def test_quota_override_project_with_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver(by_project=dict(
            test_project=dict(test_resource=15),
            override_project=dict(test_resource=20), ))
        context = FakeContext('test_project', 'test_class')
        quota_value = resource.quota(driver, context,
                                     project_id='override_project')
        self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
    def test_quota_with_project_override_class(self):
        self.flags(quota_volumes=10)
        resource = quota.BaseResource('test_resource', 'quota_volumes')
        driver = FakeDriver(by_class=dict(
            test_class=dict(test_resource=15),
            override_class=dict(test_resource=20), ))
        context = FakeContext('test_project', 'test_class')
        quota_value = resource.quota(driver, context,
                                     quota_class='override_class')
        self.assertEqual(quota_value, 20)
**** CubicPower OpenStack Study ****
class VolumeTypeResourceTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def test_name_and_flag(self):
        volume_type_name = 'foo'
        volume = {'name': volume_type_name, 'id': 'myid'}
        resource = quota.VolumeTypeResource('volumes', volume)
        self.assertEqual(resource.name, 'volumes_%s' % volume_type_name)
        self.assertIsNone(resource.flag)
        self.assertEqual(resource.default, -1)
**** CubicPower OpenStack Study ****
class QuotaEngineTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def test_init(self):
        quota_obj = quota.QuotaEngine()
        self.assertEqual(quota_obj.resources, {})
        self.assertIsInstance(quota_obj._driver, quota.DbQuotaDriver)
**** CubicPower OpenStack Study ****
    def test_init_override_string(self):
        quota_obj = quota.QuotaEngine(
            quota_driver_class='cinder.tests.test_quota.FakeDriver')
        self.assertEqual(quota_obj.resources, {})
        self.assertIsInstance(quota_obj._driver, FakeDriver)
**** CubicPower OpenStack Study ****
    def test_init_override_obj(self):
        quota_obj = quota.QuotaEngine(quota_driver_class=FakeDriver)
        self.assertEqual(quota_obj.resources, {})
        self.assertEqual(quota_obj._driver, FakeDriver)
**** CubicPower OpenStack Study ****
    def test_register_resource(self):
        quota_obj = quota.QuotaEngine()
        resource = quota.AbsoluteResource('test_resource')
        quota_obj.register_resource(resource)
        self.assertEqual(quota_obj.resources, dict(test_resource=resource))
**** CubicPower OpenStack Study ****
    def test_register_resources(self):
        quota_obj = quota.QuotaEngine()
        resources = [
            quota.AbsoluteResource('test_resource1'),
            quota.AbsoluteResource('test_resource2'),
            quota.AbsoluteResource('test_resource3'), ]
        quota_obj.register_resources(resources)
        self.assertEqual(quota_obj.resources,
                         dict(test_resource1=resources[0],
                              test_resource2=resources[1],
                              test_resource3=resources[2], ))
**** CubicPower OpenStack Study ****
    def test_get_by_project(self):
        context = FakeContext('test_project', 'test_class')
        driver = FakeDriver(
            by_project=dict(
                test_project=dict(test_resource=42)))
        quota_obj = quota.QuotaEngine(quota_driver_class=driver)
        result = quota_obj.get_by_project(context, 'test_project',
                                          'test_resource')
        self.assertEqual(driver.called,
                         [('get_by_project',
                           context,
                           'test_project',
                           'test_resource'), ])
        self.assertEqual(result, 42)
**** CubicPower OpenStack Study ****
    def test_get_by_class(self):
        context = FakeContext('test_project', 'test_class')
        driver = FakeDriver(
            by_class=dict(
                test_class=dict(test_resource=42)))
        quota_obj = quota.QuotaEngine(quota_driver_class=driver)
        result = quota_obj.get_by_class(context, 'test_class', 'test_resource')
        self.assertEqual(driver.called, [('get_by_class',
                                          context,
                                          'test_class',
                                          'test_resource'), ])
        self.assertEqual(result, 42)
**** CubicPower OpenStack Study ****
    def _make_quota_obj(self, driver):
        quota_obj = quota.QuotaEngine(quota_driver_class=driver)
        resources = [
            quota.AbsoluteResource('test_resource4'),
            quota.AbsoluteResource('test_resource3'),
            quota.AbsoluteResource('test_resource2'),
            quota.AbsoluteResource('test_resource1'), ]
        quota_obj.register_resources(resources)
        return quota_obj
**** CubicPower OpenStack Study ****
    def test_get_defaults(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        result = quota_obj.get_defaults(context)
        self.assertEqual(driver.called, [('get_defaults',
                                          context,
                                          quota_obj.resources), ])
        self.assertEqual(result, quota_obj.resources)
**** CubicPower OpenStack Study ****
    def test_get_class_quotas(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        result1 = quota_obj.get_class_quotas(context, 'test_class')
        result2 = quota_obj.get_class_quotas(context, 'test_class', False)
        self.assertEqual(driver.called, [
            ('get_class_quotas',
             context,
             quota_obj.resources,
             'test_class', True),
            ('get_class_quotas',
             context, quota_obj.resources,
             'test_class', False), ])
        self.assertEqual(result1, quota_obj.resources)
        self.assertEqual(result2, quota_obj.resources)
**** CubicPower OpenStack Study ****
    def test_get_project_quotas(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        result1 = quota_obj.get_project_quotas(context, 'test_project')
        result2 = quota_obj.get_project_quotas(context, 'test_project',
                                               quota_class='test_class',
                                               defaults=False,
                                               usages=False)
        self.assertEqual(driver.called, [
            ('get_project_quotas',
             context,
             quota_obj.resources,
             'test_project',
             None,
             True,
             True),
            ('get_project_quotas',
             context,
             quota_obj.resources,
             'test_project',
             'test_class',
             False,
             False), ])
        self.assertEqual(result1, quota_obj.resources)
        self.assertEqual(result2, quota_obj.resources)
**** CubicPower OpenStack Study ****
    def test_count_no_resource(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        self.assertRaises(exception.QuotaResourceUnknown,
                          quota_obj.count, context, 'test_resource5',
                          True, foo='bar')
**** CubicPower OpenStack Study ****
    def test_count_wrong_resource(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        self.assertRaises(exception.QuotaResourceUnknown,
                          quota_obj.count, context, 'test_resource1',
                          True, foo='bar')
**** CubicPower OpenStack Study ****
    def test_count(self):
        def fake_count(context, *args, **kwargs):
            self.assertEqual(args, (True,))
            self.assertEqual(kwargs, dict(foo='bar'))
            return 5
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.register_resource(quota.CountableResource('test_resource5',
                                                            fake_count))
        result = quota_obj.count(context, 'test_resource5', True, foo='bar')
        self.assertEqual(result, 5)
**** CubicPower OpenStack Study ****
        def fake_count(context, *args, **kwargs):
            self.assertEqual(args, (True,))
            self.assertEqual(kwargs, dict(foo='bar'))
            return 5
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.register_resource(quota.CountableResource('test_resource5',
                                                            fake_count))
        result = quota_obj.count(context, 'test_resource5', True, foo='bar')
        self.assertEqual(result, 5)
**** CubicPower OpenStack Study ****
    def test_limit_check(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.limit_check(context, test_resource1=4, test_resource2=3,
                              test_resource3=2, test_resource4=1)
        self.assertEqual(driver.called, [
            ('limit_check',
             context,
             quota_obj.resources,
             dict(
                 test_resource1=4,
                 test_resource2=3,
                 test_resource3=2,
                 test_resource4=1,),
             None), ])
**** CubicPower OpenStack Study ****
    def test_reserve(self):
        context = FakeContext(None, None)
        driver = FakeDriver(reservations=['resv-01',
                                          'resv-02',
                                          'resv-03',
                                          'resv-04', ])
        quota_obj = self._make_quota_obj(driver)
        result1 = quota_obj.reserve(context, test_resource1=4,
                                    test_resource2=3, test_resource3=2,
                                    test_resource4=1)
        result2 = quota_obj.reserve(context, expire=3600,
                                    test_resource1=1, test_resource2=2,
                                    test_resource3=3, test_resource4=4)
        result3 = quota_obj.reserve(context, project_id='fake_project',
                                    test_resource1=1, test_resource2=2,
                                    test_resource3=3, test_resource4=4)
        self.assertEqual(driver.called, [
            ('reserve',
             context,
             quota_obj.resources,
             dict(
                 test_resource1=4,
                 test_resource2=3,
                 test_resource3=2,
                 test_resource4=1, ),
             None,
             None),
            ('reserve',
             context,
             quota_obj.resources,
             dict(
                 test_resource1=1,
                 test_resource2=2,
                 test_resource3=3,
                 test_resource4=4, ),
             3600,
             None),
            ('reserve',
             context,
             quota_obj.resources,
             dict(
                 test_resource1=1,
                 test_resource2=2,
                 test_resource3=3,
                 test_resource4=4, ),
             None,
             'fake_project'), ])
        self.assertEqual(result1, ['resv-01',
                                   'resv-02',
                                   'resv-03',
                                   'resv-04', ])
        self.assertEqual(result2, ['resv-01',
                                   'resv-02',
                                   'resv-03',
                                   'resv-04', ])
        self.assertEqual(result3, ['resv-01',
                                   'resv-02',
                                   'resv-03',
                                   'resv-04', ])
**** CubicPower OpenStack Study ****
    def test_commit(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.commit(context, ['resv-01', 'resv-02', 'resv-03'])
        self.assertEqual(driver.called,
                         [('commit',
                           context,
                           ['resv-01',
                            'resv-02',
                            'resv-03'],
                           None), ])
**** CubicPower OpenStack Study ****
    def test_rollback(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.rollback(context, ['resv-01', 'resv-02', 'resv-03'])
        self.assertEqual(driver.called,
                         [('rollback',
                           context,
                           ['resv-01',
                            'resv-02',
                            'resv-03'],
                           None), ])
**** CubicPower OpenStack Study ****
    def test_destroy_all_by_project(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.destroy_all_by_project(context, 'test_project')
        self.assertEqual(driver.called,
                         [('destroy_all_by_project',
                           context,
                           'test_project'), ])
**** CubicPower OpenStack Study ****
    def test_expire(self):
        context = FakeContext(None, None)
        driver = FakeDriver()
        quota_obj = self._make_quota_obj(driver)
        quota_obj.expire(context)
        self.assertEqual(driver.called, [('expire', context), ])
**** CubicPower OpenStack Study ****
    def test_resource_names(self):
        quota_obj = self._make_quota_obj(None)
        self.assertEqual(quota_obj.resource_names,
                         ['test_resource1', 'test_resource2',
                          'test_resource3', 'test_resource4'])
**** CubicPower OpenStack Study ****
class VolumeTypeQuotaEngineTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def test_default_resources(self):
        def fake_vtga(context, inactive=False, filters=None):
            return {}
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
        engine = quota.VolumeTypeQuotaEngine()
        self.assertEqual(engine.resource_names,
                         ['gigabytes', 'snapshots', 'volumes'])
**** CubicPower OpenStack Study ****
        def fake_vtga(context, inactive=False, filters=None):
            return {}
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
        engine = quota.VolumeTypeQuotaEngine()
        self.assertEqual(engine.resource_names,
                         ['gigabytes', 'snapshots', 'volumes'])
**** CubicPower OpenStack Study ****
    def test_volume_type_resources(self):
        ctx = context.RequestContext('admin', 'admin', is_admin=True)
        vtype = db.volume_type_create(ctx, {'name': 'type1'})
        vtype2 = db.volume_type_create(ctx, {'name': 'type_2'})
        def fake_vtga(context, inactive=False, filters=None):
            return {
                'type1': {
                    'id': vtype['id'],
                    'name': 'type1',
                    'extra_specs': {},
                },
                'type_2': {
                    'id': vtype['id'],
                    'name': 'type_2',
                    'extra_specs': {},
                },
            }
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
        engine = quota.VolumeTypeQuotaEngine()
        self.assertEqual(engine.resource_names,
                         ['gigabytes', 'gigabytes_type1', 'gigabytes_type_2',
                          'snapshots', 'snapshots_type1', 'snapshots_type_2',
                          'volumes', 'volumes_type1', 'volumes_type_2'])
        db.volume_type_destroy(ctx, vtype['id'])
        db.volume_type_destroy(ctx, vtype2['id'])
**** CubicPower OpenStack Study ****
        def fake_vtga(context, inactive=False, filters=None):
            return {
                'type1': {
                    'id': vtype['id'],
                    'name': 'type1',
                    'extra_specs': {},
                },
                'type_2': {
                    'id': vtype['id'],
                    'name': 'type_2',
                    'extra_specs': {},
                },
            }
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
        engine = quota.VolumeTypeQuotaEngine()
        self.assertEqual(engine.resource_names,
                         ['gigabytes', 'gigabytes_type1', 'gigabytes_type_2',
                          'snapshots', 'snapshots_type1', 'snapshots_type_2',
                          'volumes', 'volumes_type1', 'volumes_type_2'])
        db.volume_type_destroy(ctx, vtype['id'])
        db.volume_type_destroy(ctx, vtype2['id'])
**** CubicPower OpenStack Study ****
class DbQuotaDriverTestCase(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(DbQuotaDriverTestCase, self).setUp()
        self.flags(quota_volumes=10,
                   quota_snapshots=10,
                   quota_gigabytes=1000,
                   reservation_expire=86400,
                   until_refresh=0,
                   max_age=0,
                   )
        self.driver = quota.DbQuotaDriver()
        self.calls = []
        patcher = mock.patch.object(timeutils, 'utcnow')
        self.addCleanup(patcher.stop)
        self.mock_utcnow = patcher.start()
        self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
    def test_get_defaults(self):
        # Use our pre-defined resources
        self._stub_quota_class_get_default()
        self._stub_volume_type_get_all()
        result = self.driver.get_defaults(None, quota.QUOTAS.resources)
        self.assertEqual(
            result,
            dict(
                volumes=10,
                snapshots=10,
                gigabytes=1000, ))
**** CubicPower OpenStack Study ****
    def _stub_quota_class_get_default(self):
        # Stub out quota_class_get_default
        def fake_qcgd(context):
            self.calls.append('quota_class_get_default')
            return dict(volumes=10,
                        snapshots=10,
                        gigabytes=1000,)
        self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
**** CubicPower OpenStack Study ****
        def fake_qcgd(context):
            self.calls.append('quota_class_get_default')
            return dict(volumes=10,
                        snapshots=10,
                        gigabytes=1000,)
        self.stubs.Set(db, 'quota_class_get_default', fake_qcgd)
**** CubicPower OpenStack Study ****
    def _stub_volume_type_get_all(self):
        def fake_vtga(context, inactive=False, filters=None):
            return {}
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
**** CubicPower OpenStack Study ****
        def fake_vtga(context, inactive=False, filters=None):
            return {}
        self.stubs.Set(db, 'volume_type_get_all', fake_vtga)
**** CubicPower OpenStack Study ****
    def _stub_quota_class_get_all_by_name(self):
        # Stub out quota_class_get_all_by_name
        def fake_qcgabn(context, quota_class):
            self.calls.append('quota_class_get_all_by_name')
            self.assertEqual(quota_class, 'test_class')
            return dict(gigabytes=500, volumes=10, snapshots=10, )
        self.stubs.Set(db, 'quota_class_get_all_by_name', fake_qcgabn)
**** CubicPower OpenStack Study ****
        def fake_qcgabn(context, quota_class):
            self.calls.append('quota_class_get_all_by_name')
            self.assertEqual(quota_class, 'test_class')
            return dict(gigabytes=500, volumes=10, snapshots=10, )
        self.stubs.Set(db, 'quota_class_get_all_by_name', fake_qcgabn)
**** CubicPower OpenStack Study ****
    def test_get_class_quotas(self):
        self._stub_quota_class_get_all_by_name()
        self._stub_volume_type_get_all()
        result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
                                              'test_class')
        self.assertEqual(self.calls, ['quota_class_get_all_by_name'])
        self.assertEqual(result, dict(volumes=10,
                                      gigabytes=500,
                                      snapshots=10))
**** CubicPower OpenStack Study ****
    def test_get_class_quotas_no_defaults(self):
        self._stub_quota_class_get_all_by_name()
        result = self.driver.get_class_quotas(None, quota.QUOTAS.resources,
                                              'test_class', False)
        self.assertEqual(self.calls, ['quota_class_get_all_by_name'])
        self.assertEqual(result, dict(volumes=10,
                                      gigabytes=500,
                                      snapshots=10))
**** CubicPower OpenStack Study ****
    def _stub_get_by_project(self):
        def fake_qgabp(context, project_id):
            self.calls.append('quota_get_all_by_project')
            self.assertEqual(project_id, 'test_project')
            return dict(volumes=10, gigabytes=50, reserved=0, snapshots=10)
        def fake_qugabp(context, project_id):
            self.calls.append('quota_usage_get_all_by_project')
            self.assertEqual(project_id, 'test_project')
            return dict(volumes=dict(in_use=2, reserved=0),
                        snapshots=dict(in_use=2, reserved=0),
                        gigabytes=dict(in_use=10, reserved=0), )
        self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
        self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
        self._stub_quota_class_get_all_by_name()
        self._stub_quota_class_get_default()
**** CubicPower OpenStack Study ****
        def fake_qgabp(context, project_id):
            self.calls.append('quota_get_all_by_project')
            self.assertEqual(project_id, 'test_project')
            return dict(volumes=10, gigabytes=50, reserved=0, snapshots=10)
**** CubicPower OpenStack Study ****
        def fake_qugabp(context, project_id):
            self.calls.append('quota_usage_get_all_by_project')
            self.assertEqual(project_id, 'test_project')
            return dict(volumes=dict(in_use=2, reserved=0),
                        snapshots=dict(in_use=2, reserved=0),
                        gigabytes=dict(in_use=10, reserved=0), )
        self.stubs.Set(db, 'quota_get_all_by_project', fake_qgabp)
        self.stubs.Set(db, 'quota_usage_get_all_by_project', fake_qugabp)
        self._stub_quota_class_get_all_by_name()
        self._stub_quota_class_get_default()
**** CubicPower OpenStack Study ****
    def test_get_project_quotas(self):
        self._stub_get_by_project()
        self._stub_volume_type_get_all()
        result = self.driver.get_project_quotas(
            FakeContext('test_project', 'test_class'),
            quota.QUOTAS.resources, 'test_project')
        self.assertEqual(self.calls, ['quota_get_all_by_project',
                                      'quota_usage_get_all_by_project',
                                      'quota_class_get_all_by_name',
                                      'quota_class_get_default', ])
        self.assertEqual(result, dict(volumes=dict(limit=10,
                                                   in_use=2,
                                                   reserved=0, ),
                                      snapshots=dict(limit=10,
                                                     in_use=2,
                                                     reserved=0, ),
                                      gigabytes=dict(limit=50,
                                                     in_use=10,
                                                     reserved=0, ), ))
**** CubicPower OpenStack Study ****
    def test_get_project_quotas_alt_context_no_class(self):
        self._stub_get_by_project()
        self._stub_volume_type_get_all()
        result = self.driver.get_project_quotas(
            FakeContext('other_project', 'other_class'),
            quota.QUOTAS.resources, 'test_project')
        self.assertEqual(self.calls, ['quota_get_all_by_project',
                                      'quota_usage_get_all_by_project',
                                      'quota_class_get_default', ])
        self.assertEqual(result, dict(volumes=dict(limit=10,
                                                   in_use=2,
                                                   reserved=0, ),
                                      snapshots=dict(limit=10,
                                                     in_use=2,
                                                     reserved=0, ),
                                      gigabytes=dict(limit=50,
                                                     in_use=10,
                                                     reserved=0, ), ))
**** CubicPower OpenStack Study ****
    def test_get_project_quotas_alt_context_with_class(self):
        self._stub_get_by_project()
        self._stub_volume_type_get_all()
        result = self.driver.get_project_quotas(
            FakeContext('other_project', 'other_class'),
            quota.QUOTAS.resources, 'test_project', quota_class='test_class')
        self.assertEqual(self.calls, ['quota_get_all_by_project',
                                      'quota_usage_get_all_by_project',
                                      'quota_class_get_all_by_name',
                                      'quota_class_get_default', ])
        self.assertEqual(result, dict(volumes=dict(limit=10,
                                                   in_use=2,
                                                   reserved=0, ),
                                      snapshots=dict(limit=10,
                                                     in_use=2,
                                                     reserved=0, ),
                                      gigabytes=dict(limit=50,
                                                     in_use=10,
                                                     reserved=0, ), ))
**** CubicPower OpenStack Study ****
    def test_get_project_quotas_no_defaults(self):
        self._stub_get_by_project()
        self._stub_volume_type_get_all()
        result = self.driver.get_project_quotas(
            FakeContext('test_project', 'test_class'),
            quota.QUOTAS.resources, 'test_project', defaults=False)
        self.assertEqual(self.calls, ['quota_get_all_by_project',
                                      'quota_usage_get_all_by_project',
                                      'quota_class_get_all_by_name',
                                      'quota_class_get_default', ])
        self.assertEqual(result,
                         dict(gigabytes=dict(limit=50,
                                             in_use=10,
                                             reserved=0, ),
                              snapshots=dict(limit=10,
                                             in_use=2,
                                             reserved=0, ),
                              volumes=dict(limit=10,
                                           in_use=2,
                                           reserved=0, ), ))
**** CubicPower OpenStack Study ****
    def test_get_project_quotas_no_usages(self):
        self._stub_get_by_project()
        self._stub_volume_type_get_all()
        result = self.driver.get_project_quotas(
            FakeContext('test_project', 'test_class'),
            quota.QUOTAS.resources, 'test_project', usages=False)
        self.assertEqual(self.calls, ['quota_get_all_by_project',
                                      'quota_class_get_all_by_name',
                                      'quota_class_get_default', ])
        self.assertEqual(result, dict(volumes=dict(limit=10, ),
                                      snapshots=dict(limit=10, ),
                                      gigabytes=dict(limit=50, ), ))
**** CubicPower OpenStack Study ****
    def _stub_get_project_quotas(self):
        def fake_get_project_quotas(context, resources, project_id,
                                    quota_class=None, defaults=True,
                                    usages=True):
            self.calls.append('get_project_quotas')
            return dict((k, dict(limit=v.default))
                        for k, v in resources.items())
        self.stubs.Set(self.driver, 'get_project_quotas',
                       fake_get_project_quotas)
**** CubicPower OpenStack Study ****
        def fake_get_project_quotas(context, resources, project_id,
                                    quota_class=None, defaults=True,
                                    usages=True):
            self.calls.append('get_project_quotas')
            return dict((k, dict(limit=v.default))
                        for k, v in resources.items())
        self.stubs.Set(self.driver, 'get_project_quotas',
                       fake_get_project_quotas)
**** CubicPower OpenStack Study ****
    def test_get_quotas_has_sync_unknown(self):
        self._stub_get_project_quotas()
        self.assertRaises(exception.QuotaResourceUnknown,
                          self.driver._get_quotas,
                          None, quota.QUOTAS.resources,
                          ['unknown'], True)
        self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
    def test_get_quotas_no_sync_unknown(self):
        self._stub_get_project_quotas()
        self.assertRaises(exception.QuotaResourceUnknown,
                          self.driver._get_quotas,
                          None, quota.QUOTAS.resources,
                          ['unknown'], False)
        self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
    def test_get_quotas_has_sync_no_sync_resource(self):
        self._stub_get_project_quotas()
        self.assertRaises(exception.QuotaResourceUnknown,
                          self.driver._get_quotas,
                          None, quota.QUOTAS.resources,
                          ['metadata_items'], True)
        self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
    def test_get_quotas_no_sync_has_sync_resource(self):
        self._stub_get_project_quotas()
        self.assertRaises(exception.QuotaResourceUnknown,
                          self.driver._get_quotas,
                          None, quota.QUOTAS.resources,
                          ['volumes'], False)
        self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
    def test_get_quotas_has_sync(self):
        self._stub_get_project_quotas()
        result = self.driver._get_quotas(FakeContext('test_project',
                                                     'test_class'),
                                         quota.QUOTAS.resources,
                                         ['volumes', 'gigabytes'],
                                         True)
        self.assertEqual(self.calls, ['get_project_quotas'])
        self.assertEqual(result, dict(volumes=10, gigabytes=1000, ))
**** CubicPower OpenStack Study ****
    def _stub_quota_reserve(self):
        def fake_quota_reserve(context, resources, quotas, deltas, expire,
                               until_refresh, max_age, project_id=None):
            self.calls.append(('quota_reserve', expire, until_refresh,
                               max_age))
            return ['resv-1', 'resv-2', 'resv-3']
        self.stubs.Set(db, 'quota_reserve', fake_quota_reserve)
**** CubicPower OpenStack Study ****
        def fake_quota_reserve(context, resources, quotas, deltas, expire,
                               until_refresh, max_age, project_id=None):
            self.calls.append(('quota_reserve', expire, until_refresh,
                               max_age))
            return ['resv-1', 'resv-2', 'resv-3']
        self.stubs.Set(db, 'quota_reserve', fake_quota_reserve)
**** CubicPower OpenStack Study ****
    def test_reserve_bad_expire(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        self.assertRaises(exception.InvalidReservationExpiration,
                          self.driver.reserve,
                          FakeContext('test_project', 'test_class'),
                          quota.QUOTAS.resources,
                          dict(volumes=2), expire='invalid')
        self.assertEqual(self.calls, [])
**** CubicPower OpenStack Study ****
    def test_reserve_default_expire(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2))
        expire = timeutils.utcnow() + datetime.timedelta(seconds=86400)
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 0, 0), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def test_reserve_int_expire(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2), expire=3600)
        expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 0, 0), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def test_reserve_timedelta_expire(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        expire_delta = datetime.timedelta(seconds=60)
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2), expire=expire_delta)
        expire = timeutils.utcnow() + expire_delta
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 0, 0), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def test_reserve_datetime_expire(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2), expire=expire)
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 0, 0), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def test_reserve_until_refresh(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        self.flags(until_refresh=500)
        expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2), expire=expire)
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 500, 0), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def test_reserve_max_age(self):
        self._stub_get_project_quotas()
        self._stub_quota_reserve()
        self.flags(max_age=86400)
        expire = timeutils.utcnow() + datetime.timedelta(seconds=120)
        result = self.driver.reserve(FakeContext('test_project', 'test_class'),
                                     quota.QUOTAS.resources,
                                     dict(volumes=2), expire=expire)
        self.assertEqual(self.calls, ['get_project_quotas',
                                      ('quota_reserve', expire, 0, 86400), ])
        self.assertEqual(result, ['resv-1', 'resv-2', 'resv-3'])
**** CubicPower OpenStack Study ****
    def _stub_quota_destroy_all_by_project(self):
        def fake_quota_destroy_all_by_project(context, project_id):
            self.calls.append(('quota_destroy_all_by_project', project_id))
            return None
        self.stubs.Set(sqa_api, 'quota_destroy_all_by_project',
                       fake_quota_destroy_all_by_project)
**** CubicPower OpenStack Study ****
        def fake_quota_destroy_all_by_project(context, project_id):
            self.calls.append(('quota_destroy_all_by_project', project_id))
            return None
        self.stubs.Set(sqa_api, 'quota_destroy_all_by_project',
                       fake_quota_destroy_all_by_project)
**** CubicPower OpenStack Study ****
    def test_destroy_by_project(self):
        self._stub_quota_destroy_all_by_project()
        self.driver.destroy_all_by_project(FakeContext('test_project',
                                                       'test_class'),
                                           'test_project')
        self.assertEqual(self.calls, [('quota_destroy_all_by_project',
                                      ('test_project')), ])
**** CubicPower OpenStack Study ****
class FakeSession(object):
    
**** CubicPower OpenStack Study ****
    def begin(self):
        return self
**** CubicPower OpenStack Study ****
    def __enter__(self):
        return self
**** CubicPower OpenStack Study ****
    def __exit__(self, exc_type, exc_value, exc_traceback):
        return False
**** CubicPower OpenStack Study ****
class FakeUsage(sqa_models.QuotaUsage):
    
**** CubicPower OpenStack Study ****
    def save(self, *args, **kwargs):
        pass
**** CubicPower OpenStack Study ****
class QuotaReserveSqlAlchemyTestCase(test.TestCase):
    # cinder.db.sqlalchemy.api.quota_reserve is so complex it needs its
    # own test case, and since it's a quota manipulator, this is the
    # best place to put it...
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(QuotaReserveSqlAlchemyTestCase, self).setUp()
        self.sync_called = set()
        def make_sync(res_name):
            def fake_sync(context, project_id, volume_type_id=None,
                          volume_type_name=None, session=None):
                self.sync_called.add(res_name)
                if res_name in self.usages:
                    if self.usages[res_name].in_use < 0:
                        return {res_name: 2}
                    else:
                        return {res_name: self.usages[res_name].in_use - 1}
                return {res_name: 0}
            return fake_sync
        self.resources = {}
        QUOTA_SYNC_FUNCTIONS = {}
        for res_name in ('volumes', 'gigabytes'):
            res = quota.ReservableResource(res_name, '_sync_%s' % res_name)
            QUOTA_SYNC_FUNCTIONS['_sync_%s' % res_name] = make_sync(res_name)
            self.resources[res_name] = res
        self.stubs.Set(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
        self.expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
        self.usages = {}
        self.usages_created = {}
        self.reservations_created = {}
        def fake_get_session():
            return FakeSession()
        def fake_get_quota_usages(context, session, project_id):
            return self.usages.copy()
        def fake_quota_usage_create(context, project_id, resource, in_use,
                                    reserved, until_refresh, session=None,
                                    save=True):
            quota_usage_ref = self._make_quota_usage(
                project_id, resource, in_use, reserved, until_refresh,
                timeutils.utcnow(), timeutils.utcnow())
            self.usages_created[resource] = quota_usage_ref
            return quota_usage_ref
        def fake_reservation_create(context, uuid, usage_id, project_id,
                                    resource, delta, expire, session=None):
            reservation_ref = self._make_reservation(
                uuid, usage_id, project_id, resource, delta, expire,
                timeutils.utcnow(), timeutils.utcnow())
            self.reservations_created[resource] = reservation_ref
            return reservation_ref
        self.stubs.Set(sqa_api, 'get_session', fake_get_session)
        self.stubs.Set(sqa_api, '_get_quota_usages', fake_get_quota_usages)
        self.stubs.Set(sqa_api, '_quota_usage_create', fake_quota_usage_create)
        self.stubs.Set(sqa_api, '_reservation_create', fake_reservation_create)
        patcher = mock.patch.object(timeutils, 'utcnow')
        self.addCleanup(patcher.stop)
        self.mock_utcnow = patcher.start()
        self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
        def make_sync(res_name):
            def fake_sync(context, project_id, volume_type_id=None,
                          volume_type_name=None, session=None):
                self.sync_called.add(res_name)
                if res_name in self.usages:
                    if self.usages[res_name].in_use < 0:
                        return {res_name: 2}
                    else:
                        return {res_name: self.usages[res_name].in_use - 1}
                return {res_name: 0}
            return fake_sync
        self.resources = {}
        QUOTA_SYNC_FUNCTIONS = {}
        for res_name in ('volumes', 'gigabytes'):
            res = quota.ReservableResource(res_name, '_sync_%s' % res_name)
            QUOTA_SYNC_FUNCTIONS['_sync_%s' % res_name] = make_sync(res_name)
            self.resources[res_name] = res
        self.stubs.Set(sqa_api, 'QUOTA_SYNC_FUNCTIONS', QUOTA_SYNC_FUNCTIONS)
        self.expire = timeutils.utcnow() + datetime.timedelta(seconds=3600)
        self.usages = {}
        self.usages_created = {}
        self.reservations_created = {}
**** CubicPower OpenStack Study ****
        def fake_get_session():
            return FakeSession()
**** CubicPower OpenStack Study ****
        def fake_get_quota_usages(context, session, project_id):
            return self.usages.copy()
**** CubicPower OpenStack Study ****
        def fake_quota_usage_create(context, project_id, resource, in_use,
                                    reserved, until_refresh, session=None,
                                    save=True):
            quota_usage_ref = self._make_quota_usage(
                project_id, resource, in_use, reserved, until_refresh,
                timeutils.utcnow(), timeutils.utcnow())
            self.usages_created[resource] = quota_usage_ref
            return quota_usage_ref
**** CubicPower OpenStack Study ****
        def fake_reservation_create(context, uuid, usage_id, project_id,
                                    resource, delta, expire, session=None):
            reservation_ref = self._make_reservation(
                uuid, usage_id, project_id, resource, delta, expire,
                timeutils.utcnow(), timeutils.utcnow())
            self.reservations_created[resource] = reservation_ref
            return reservation_ref
        self.stubs.Set(sqa_api, 'get_session', fake_get_session)
        self.stubs.Set(sqa_api, '_get_quota_usages', fake_get_quota_usages)
        self.stubs.Set(sqa_api, '_quota_usage_create', fake_quota_usage_create)
        self.stubs.Set(sqa_api, '_reservation_create', fake_reservation_create)
        patcher = mock.patch.object(timeutils, 'utcnow')
        self.addCleanup(patcher.stop)
        self.mock_utcnow = patcher.start()
        self.mock_utcnow.return_value = datetime.datetime.utcnow()
**** CubicPower OpenStack Study ****
    def _make_quota_usage(self, project_id, resource, in_use, reserved,
                          until_refresh, created_at, updated_at):
        quota_usage_ref = FakeUsage()
        quota_usage_ref.id = len(self.usages) + len(self.usages_created)
        quota_usage_ref.project_id = project_id
        quota_usage_ref.resource = resource
        quota_usage_ref.in_use = in_use
        quota_usage_ref.reserved = reserved
        quota_usage_ref.until_refresh = until_refresh
        quota_usage_ref.created_at = created_at
        quota_usage_ref.updated_at = updated_at
        quota_usage_ref.deleted_at = None
        quota_usage_ref.deleted = False
        return quota_usage_ref
**** CubicPower OpenStack Study ****
    def init_usage(self, project_id, resource, in_use, reserved,
                   until_refresh=None, created_at=None, updated_at=None):
        if created_at is None:
            created_at = timeutils.utcnow()
        if updated_at is None:
            updated_at = timeutils.utcnow()
        quota_usage_ref = self._make_quota_usage(project_id, resource, in_use,
                                                 reserved, until_refresh,
                                                 created_at, updated_at)
        self.usages[resource] = quota_usage_ref
**** CubicPower OpenStack Study ****
    def compare_usage(self, usage_dict, expected):
        for usage in expected:
            resource = usage['resource']
            for key, value in usage.items():
                actual = getattr(usage_dict[resource], key)
                self.assertEqual(actual, value,
                                 "%s != %s on usage for resource %s" %
                                 (actual, value, resource))
**** CubicPower OpenStack Study ****
    def _make_reservation(self, uuid, usage_id, project_id, resource,
                          delta, expire, created_at, updated_at):
        reservation_ref = sqa_models.Reservation()
        reservation_ref.id = len(self.reservations_created)
        reservation_ref.uuid = uuid
        reservation_ref.usage_id = usage_id
        reservation_ref.project_id = project_id
        reservation_ref.resource = resource
        reservation_ref.delta = delta
        reservation_ref.expire = expire
        reservation_ref.created_at = created_at
        reservation_ref.updated_at = updated_at
        reservation_ref.deleted_at = None
        reservation_ref.deleted = False
        return reservation_ref
**** CubicPower OpenStack Study ****
    def compare_reservation(self, reservations, expected):
        reservations = set(reservations)
        for resv in expected:
            resource = resv['resource']
            resv_obj = self.reservations_created[resource]
            self.assertIn(resv_obj.uuid, reservations)
            reservations.discard(resv_obj.uuid)
            for key, value in resv.items():
                actual = getattr(resv_obj, key)
                self.assertEqual(actual, value,
                                 "%s != %s on reservation for resource %s" %
                                 (actual, value, resource))
        self.assertEqual(len(reservations), 0)
**** CubicPower OpenStack Study ****
    def test_quota_reserve_create_usages(self):
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5,
                      gigabytes=10 * 1024, )
        deltas = dict(volumes=2,
                      gigabytes=2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 0, 0)
        self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
        self.compare_usage(self.usages_created,
                           [dict(resource='volumes',
                                 project_id='test_project',
                                 in_use=0,
                                 reserved=2,
                                 until_refresh=None),
                            dict(resource='gigabytes',
                                 project_id='test_project',
                                 in_use=0,
                                 reserved=2 * 1024,
                                 until_refresh=None), ])
        self.compare_reservation(
            result,
            [dict(resource='volumes',
                  usage_id=self.usages_created['volumes'],
                  project_id='test_project',
                  delta=2),
             dict(resource='gigabytes',
                  usage_id=self.usages_created['gigabytes'],
                  delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_negative_in_use(self):
        self.init_usage('test_project', 'volumes', -1, 0, until_refresh=1)
        self.init_usage('test_project', 'gigabytes', -1, 0, until_refresh=1)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5,
                      gigabytes=10 * 1024, )
        deltas = dict(volumes=2,
                      gigabytes=2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 5, 0)
        self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2,
                                              until_refresh=5),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2 * 1024,
                                              until_refresh=5), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_until_refresh(self):
        self.init_usage('test_project', 'volumes', 3, 0, until_refresh=1)
        self.init_usage('test_project', 'gigabytes', 3, 0, until_refresh=1)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=2, gigabytes=2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 5, 0)
        self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2,
                                              until_refresh=5),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2 * 1024,
                                              until_refresh=5), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_max_age(self):
        max_age = 3600
        record_created = (timeutils.utcnow() -
                          datetime.timedelta(seconds=max_age))
        self.init_usage('test_project', 'volumes', 3, 0,
                        created_at=record_created, updated_at=record_created)
        self.init_usage('test_project', 'gigabytes', 3, 0,
                        created_at=record_created, updated_at=record_created)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=2, gigabytes=2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 0, max_age)
        self.assertEqual(self.sync_called, set(['volumes', 'gigabytes']))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2,
                                              until_refresh=None),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=2,
                                              reserved=2 * 1024,
                                              until_refresh=None), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_no_refresh(self):
        self.init_usage('test_project', 'volumes', 3, 0)
        self.init_usage('test_project', 'gigabytes', 3, 0)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=2, gigabytes=2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 0, 0)
        self.assertEqual(self.sync_called, set([]))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=3,
                                              reserved=2,
                                              until_refresh=None),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=3,
                                              reserved=2 * 1024,
                                              until_refresh=None), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       delta=2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_unders(self):
        self.init_usage('test_project', 'volumes', 1, 0)
        self.init_usage('test_project', 'gigabytes', 1 * 1024, 0)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 0, 0)
        self.assertEqual(self.sync_called, set([]))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=1,
                                              reserved=0,
                                              until_refresh=None),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=1 * 1024,
                                              reserved=0,
                                              until_refresh=None), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=-2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       delta=-2 * 1024), ])
**** CubicPower OpenStack Study ****
    def test_quota_reserve_overs(self):
        self.init_usage('test_project', 'volumes', 4, 0)
        self.init_usage('test_project', 'gigabytes', 10 * 1024, 0)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=2, gigabytes=2 * 1024, )
        self.assertRaises(exception.OverQuota,
                          sqa_api.quota_reserve,
                          context, self.resources, quotas,
                          deltas, self.expire, 0, 0)
        self.assertEqual(self.sync_called, set([]))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=4,
                                              reserved=0,
                                              until_refresh=None),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=10 * 1024,
                                              reserved=0,
                                              until_refresh=None), ])
        self.assertEqual(self.usages_created, {})
        self.assertEqual(self.reservations_created, {})
**** CubicPower OpenStack Study ****
    def test_quota_reserve_reduction(self):
        self.init_usage('test_project', 'volumes', 10, 0)
        self.init_usage('test_project', 'gigabytes', 20 * 1024, 0)
        context = FakeContext('test_project', 'test_class')
        quotas = dict(volumes=5, gigabytes=10 * 1024, )
        deltas = dict(volumes=-2, gigabytes=-2 * 1024, )
        result = sqa_api.quota_reserve(context, self.resources, quotas,
                                       deltas, self.expire, 0, 0)
        self.assertEqual(self.sync_called, set([]))
        self.compare_usage(self.usages, [dict(resource='volumes',
                                              project_id='test_project',
                                              in_use=10,
                                              reserved=0,
                                              until_refresh=None),
                                         dict(resource='gigabytes',
                                              project_id='test_project',
                                              in_use=20 * 1024,
                                              reserved=0,
                                              until_refresh=None), ])
        self.assertEqual(self.usages_created, {})
        self.compare_reservation(result,
                                 [dict(resource='volumes',
                                       usage_id=self.usages['volumes'],
                                       project_id='test_project',
                                       delta=-2),
                                  dict(resource='gigabytes',
                                       usage_id=self.usages['gigabytes'],
                                       project_id='test_project',
                                       delta=-2 * 1024), ])