**** CubicPower OpenStack Study ****
# Copyright (C) 2013 Yahoo! Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import webob
from glance.api import cached_images
from glance.api import policy
from glance.common import exception
from glance import image_cache
**** CubicPower OpenStack Study ****
class FakePolicyEnforcer(policy.Enforcer):
**** CubicPower OpenStack Study ****
def __init__(self):
self.default_rule = ''
self.policy_path = ''
self.policy_file_mtime = None
self.policy_file_contents = None
**** CubicPower OpenStack Study ****
def enforce(self, context, action, target):
return 'pass'
**** CubicPower OpenStack Study ****
def check(rule, target, creds, exc=None, *args, **kwargs):
return 'pass'
**** CubicPower OpenStack Study ****
def _check(self, context, rule, target, *args, **kwargs):
return 'pass'
**** CubicPower OpenStack Study ****
class FakeCache(image_cache.ImageCache):
**** CubicPower OpenStack Study ****
def __init__(self):
self.init_driver()
self.deleted_images = []
**** CubicPower OpenStack Study ****
def init_driver(self):
pass
**** CubicPower OpenStack Study ****
def get_cached_images(self):
return {'id': 'test'}
**** CubicPower OpenStack Study ****
def delete_cached_image(self, image_id):
self.deleted_images.append(image_id)
**** CubicPower OpenStack Study ****
def get_queued_images(self):
return {'test': 'passed'}
**** CubicPower OpenStack Study ****
def queue_image(self, image_id):
return 'pass'
**** CubicPower OpenStack Study ****
def delete_queued_image(self, image_id):
self.deleted_images.append(image_id)
**** CubicPower OpenStack Study ****
class FakeController(cached_images.Controller):
**** CubicPower OpenStack Study ****
def __init__(self):
self.cache = FakeCache()
self.policy = FakePolicyEnforcer()
**** CubicPower OpenStack Study ****
class TestController(testtools.TestCase):
**** CubicPower OpenStack Study ****
def test_initialization_without_conf(self):
self.assertRaises(exception.BadDriverConfiguration,
cached_images.Controller)
**** CubicPower OpenStack Study ****
class TestCachedImages(testtools.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestCachedImages, self).setUp()
test_controller = FakeController()
self.controller = test_controller
**** CubicPower OpenStack Study ****
def test_get_cached_images(self):
req = webob.Request.blank('')
req.context = 'test'
result = self.controller.get_cached_images(req)
self.assertEqual({'cached_images': {'id': 'test'}}, result)
**** CubicPower OpenStack Study ****
def test_delete_cached_images(self):
req = webob.Request.blank('')
req.context = 'test'
self.controller.delete_cached_image(req, image_id='test')
self.assertEqual(['test'], self.controller.cache.deleted_images)
**** CubicPower OpenStack Study ****
def test_get_queued_images(self):
req = webob.Request.blank('')
req.context = 'test'
result = self.controller.get_queued_images(req)
self.assertEqual({'queued_images': {'test': 'passed'}}, result)
**** CubicPower OpenStack Study ****
def test_queue_image(self):
req = webob.Request.blank('')
req.context = 'test'
self.controller.queue_image(req, image_id='test1')
**** CubicPower OpenStack Study ****
def test_delete_queued_image(self):
req = webob.Request.blank('')
req.context = 'test'
self.controller.delete_queued_image(req, 'deleted_img')
self.assertEqual(['deleted_img'],
self.controller.cache.deleted_images)