**** CubicPower OpenStack Study ****
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mox
from glance.common import exception
import glance.store
from glance.tests.unit import utils as unit_test_utils
from glance.tests import utils
BASE_URI = 'http://storeurl.com/container'
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
UUID2 = '971ec09a-8067-4bc8-a91f-ae3557f1c4c7'
USER1 = '54492ba0-f4df-4e4e-be62-27f4d76b29cf'
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
TENANT3 = '228c6da5-29cd-4d67-9457-ed632e083fc0'
**** CubicPower OpenStack Study ****
class ImageRepoStub(object):
**** CubicPower OpenStack Study ****
def add(self, image):
return image
**** CubicPower OpenStack Study ****
def save(self, image):
return image
**** CubicPower OpenStack Study ****
class ImageStub(object):
**** CubicPower OpenStack Study ****
def __init__(self, image_id, status=None, locations=None,
visibility=None):
self.image_id = image_id
self.status = status
self.locations = locations or []
self.visibility = visibility
self.size = 1
**** CubicPower OpenStack Study ****
def delete(self):
self.status = 'deleted'
**** CubicPower OpenStack Study ****
def get_member_repo(self):
return FakeMemberRepo(self, [TENANT1, TENANT2])
**** CubicPower OpenStack Study ****
class ImageFactoryStub(object):
**** CubicPower OpenStack Study ****
def new_image(self, image_id=None, name=None, visibility='private',
min_disk=0, min_ram=0, protected=False, owner=None,
disk_format=None, container_format=None,
extra_properties=None, tags=None, **other_args):
return ImageStub(image_id, visibility=visibility, **other_args)
**** CubicPower OpenStack Study ****
class FakeMemberRepo(object):
**** CubicPower OpenStack Study ****
def __init__(self, image, tenants=None):
self.image = image
self.factory = glance.domain.ImageMemberFactory()
self.tenants = tenants or []
**** CubicPower OpenStack Study ****
def list(self, *args, **kwargs):
return [self.factory.new_image_member(self.image, tenant)
for tenant in self.tenants]
**** CubicPower OpenStack Study ****
def add(self, member):
self.tenants.append(member.member_id)
**** CubicPower OpenStack Study ****
def remove(self, member):
self.tenants.remove(member.member_id)
**** CubicPower OpenStack Study ****
class TestStoreImage(utils.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
locations = [{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}}]
self.image_stub = ImageStub(UUID1, 'active', locations)
self.store_api = unit_test_utils.FakeStoreAPI()
super(TestStoreImage, self).setUp()
**** CubicPower OpenStack Study ****
def test_image_delete(self):
image = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
location = image.locations[0]
self.assertEqual(image.status, 'active')
self.store_api.get_from_backend({}, location['url'])
image.delete()
self.assertEqual(image.status, 'deleted')
self.assertRaises(exception.NotFound,
self.store_api.get_from_backend, {}, location['url'])
**** CubicPower OpenStack Study ****
def test_image_get_data(self):
image = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
self.assertEqual(image.get_data(), 'XXX')
**** CubicPower OpenStack Study ****
def test_image_get_data_from_second_location(self):
def fake_get_from_backend(self, context, location):
if UUID1 in location:
raise Exception('not allow download from %s' % location)
else:
return self.data[location]
image1 = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
self.assertEqual(image1.get_data(), 'XXX')
# Multiple location support
context = glance.context.RequestContext(user=USER1)
(image2, image_stub2) = self._add_image(context, UUID2, 'ZZZ', 3)
location_data = image2.locations[0]
image1.locations.append(location_data)
self.assertEqual(len(image1.locations), 2)
self.assertEqual(location_data['url'], UUID2)
self.stubs.Set(unit_test_utils.FakeStoreAPI, 'get_from_backend',
fake_get_from_backend)
self.assertEqual(image1.get_data().fd, 'ZZZ')
image1.locations.pop(0)
self.assertEqual(len(image1.locations), 1)
image2.delete()
**** CubicPower OpenStack Study ****
def fake_get_from_backend(self, context, location):
if UUID1 in location:
raise Exception('not allow download from %s' % location)
else:
return self.data[location]
image1 = glance.store.ImageProxy(self.image_stub, {}, self.store_api)
self.assertEqual(image1.get_data(), 'XXX')
# Multiple location support
context = glance.context.RequestContext(user=USER1)
(image2, image_stub2) = self._add_image(context, UUID2, 'ZZZ', 3)
location_data = image2.locations[0]
image1.locations.append(location_data)
self.assertEqual(len(image1.locations), 2)
self.assertEqual(location_data['url'], UUID2)
self.stubs.Set(unit_test_utils.FakeStoreAPI, 'get_from_backend',
fake_get_from_backend)
self.assertEqual(image1.get_data().fd, 'ZZZ')
image1.locations.pop(0)
self.assertEqual(len(image1.locations), 1)
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_set_data(self):
context = glance.context.RequestContext(user=USER1)
image_stub = ImageStub(UUID2, status='queued', locations=[])
image = glance.store.ImageProxy(image_stub, context, self.store_api)
image.set_data('YYYY', 4)
self.assertEqual(image.size, 4)
#NOTE(markwash): FakeStore returns image_id for location
self.assertEqual(image.locations[0]['url'], UUID2)
self.assertEqual(image.checksum, 'Z')
self.assertEqual(image.status, 'active')
**** CubicPower OpenStack Study ****
def test_image_set_data_location_metadata(self):
context = glance.context.RequestContext(user=USER1)
image_stub = ImageStub(UUID2, status='queued', locations=[])
loc_meta = {'key': 'value5032'}
store_api = unit_test_utils.FakeStoreAPI(store_metadata=loc_meta)
image = glance.store.ImageProxy(image_stub, context, store_api)
image.set_data('YYYY', 4)
self.assertEqual(image.size, 4)
location_data = image.locations[0]
self.assertEqual(location_data['url'], UUID2)
self.assertEqual(location_data['metadata'], loc_meta)
self.assertEqual(image.checksum, 'Z')
self.assertEqual(image.status, 'active')
image.delete()
self.assertEqual(image.status, 'deleted')
self.assertRaises(exception.NotFound,
self.store_api.get_from_backend, {},
image.locations[0]['url'])
**** CubicPower OpenStack Study ****
def test_image_set_data_unknown_size(self):
context = glance.context.RequestContext(user=USER1)
image_stub = ImageStub(UUID2, status='queued', locations=[])
image = glance.store.ImageProxy(image_stub, context, self.store_api)
image.set_data('YYYY', None)
self.assertEqual(image.size, 4)
#NOTE(markwash): FakeStore returns image_id for location
self.assertEqual(image.locations[0]['url'], UUID2)
self.assertEqual(image.checksum, 'Z')
self.assertEqual(image.status, 'active')
image.delete()
self.assertEqual(image.status, 'deleted')
self.assertRaises(exception.NotFound,
self.store_api.get_from_backend, {},
image.locations[0]['url'])
**** CubicPower OpenStack Study ****
def _add_image(self, context, image_id, data, len):
image_stub = ImageStub(image_id, status='queued', locations=[])
image = glance.store.ImageProxy(image_stub,
context, self.store_api)
image.set_data(data, len)
self.assertEqual(image.size, len)
#NOTE(markwash): FakeStore returns image_id for location
location = {'url': image_id, 'metadata': {}}
self.assertEqual(image.locations, [location])
self.assertEqual(image_stub.locations, [location])
self.assertEqual(image.status, 'active')
return (image, image_stub)
**** CubicPower OpenStack Study ****
def test_image_change_append_invalid_location_uri(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
location_bad = {'url': 'unknown://location', 'metadata': {}}
self.assertRaises(exception.BadStoreUri,
image1.locations.append, location_bad)
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_append_invalid_location_metatdata(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
# Using only one test rule here is enough to make sure
# 'store.check_location_metadata()' can be triggered
# in Location proxy layer. Complete test rule for
# 'store.check_location_metadata()' testing please
# check below cases within 'TestStoreMetaDataChecker'.
location_bad = {'url': UUID3, 'metadata': "a invalid metadata"}
self.assertRaises(glance.store.BackendException,
image1.locations.append, location_bad)
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_append_locations(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image1.locations.append(location3)
self.assertEqual(image_stub1.locations, [location2, location3])
self.assertEqual(image1.locations, [location2, location3])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_change_pop_location(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image1.locations.append(location3)
self.assertEqual(image_stub1.locations, [location2, location3])
self.assertEqual(image1.locations, [location2, location3])
image1.locations.pop()
self.assertEqual(image_stub1.locations, [location2])
self.assertEqual(image1.locations, [location2])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_change_extend_invalid_locations_uri(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
location_bad = {'url': 'unknown://location', 'metadata': {}}
self.assertRaises(exception.BadStoreUri,
image1.locations.extend, [location_bad])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_extend_invalid_locations_metadata(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location_bad = {'url': UUID3, 'metadata': "a invalid metadata"}
self.assertRaises(glance.store.BackendException,
image1.locations.extend, [location_bad])
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_extend_locations(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image1.locations.extend([location3])
self.assertEqual(image_stub1.locations, [location2, location3])
self.assertEqual(image1.locations, [location2, location3])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_change_remove_location(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
location_bad = {'url': 'unknown://location', 'metadata': {}}
image1.locations.extend([location3])
image1.locations.remove(location2)
self.assertEqual(image_stub1.locations, [location3])
self.assertEqual(image1.locations, [location3])
self.assertRaises(ValueError,
image1.locations.remove, location_bad)
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_delete_location(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
del image1.locations[0]
self.assertEqual(image_stub1.locations, [])
self.assertEqual(len(image1.locations), 0)
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
image1.delete()
**** CubicPower OpenStack Study ****
def test_image_change_insert_invalid_location_uri(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
location_bad = {'url': 'unknown://location', 'metadata': {}}
self.assertRaises(exception.BadStoreUri,
image1.locations.insert, 0, location_bad)
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_insert_invalid_location_metadata(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location_bad = {'url': UUID3, 'metadata': "a invalid metadata"}
self.assertRaises(glance.store.BackendException,
image1.locations.insert, 0, location_bad)
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_insert_location(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image1.locations.insert(0, location3)
self.assertEqual(image_stub1.locations, [location3, location2])
self.assertEqual(image1.locations, [location3, location2])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_change_delete_locations(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image1.locations.insert(0, location3)
del image1.locations[0:100]
self.assertEqual(image_stub1.locations, [])
self.assertEqual(len(image1.locations), 0)
self.assertRaises(exception.BadStoreUri,
image1.locations.insert, 0, location2)
self.assertRaises(exception.BadStoreUri,
image2.locations.insert, 0, location3)
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_adding_invalid_location_uri(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
image_stub1 = ImageStub('fake_image_id', status='queued', locations=[])
image1 = glance.store.ImageProxy(image_stub1, context, self.store_api)
location_bad = {'url': 'unknown://location', 'metadata': {}}
self.assertRaises(exception.BadStoreUri,
image1.locations.__iadd__, [location_bad])
self.assertEqual(image_stub1.locations, [])
self.assertEqual(image1.locations, [])
image1.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_adding_invalid_location_metadata(self):
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
image_stub2 = ImageStub('fake_image_id', status='queued', locations=[])
image2 = glance.store.ImageProxy(image_stub2, context, self.store_api)
location_bad = {'url': UUID2, 'metadata': "a invalid metadata"}
self.assertRaises(glance.store.BackendException,
image2.locations.__iadd__, [location_bad])
self.assertEqual(image_stub2.locations, [])
self.assertEqual(image2.locations, [])
image1.delete()
image2.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
**** CubicPower OpenStack Study ****
def test_image_change_adding_locations(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
image_stub3 = ImageStub('fake_image_id', status='queued', locations=[])
image3 = glance.store.ImageProxy(image_stub3, context, self.store_api)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image3.locations += [location2, location3]
self.assertEqual(image_stub3.locations, [location2, location3])
self.assertEqual(image3.locations, [location2, location3])
image3.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image1.delete()
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_get_location_index(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
image_stub3 = ImageStub('fake_image_id', status='queued', locations=[])
image3 = glance.store.ImageProxy(image_stub3, context, self.store_api)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image3.locations += [location2, location3]
self.assertEqual(image_stub3.locations.index(location3), 1)
image3.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image1.delete()
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_get_location_by_index(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
image_stub3 = ImageStub('fake_image_id', status='queued', locations=[])
image3 = glance.store.ImageProxy(image_stub3, context, self.store_api)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image3.locations += [location2, location3]
self.assertEqual(image_stub3.locations.index(location3), 1)
self.assertEqual(image_stub3.locations[0], location2)
image3.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image1.delete()
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_checking_location_exists(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
image_stub3 = ImageStub('fake_image_id', status='queued', locations=[])
image3 = glance.store.ImageProxy(image_stub3, context, self.store_api)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
location_bad = {'url': 'unknown://location', 'metadata': {}}
image3.locations += [location2, location3]
self.assertTrue(location3 in image_stub3.locations)
self.assertFalse(location_bad in image_stub3.locations)
image3.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image1.delete()
image2.delete()
**** CubicPower OpenStack Study ****
def test_image_reverse_locations_order(self):
UUID3 = 'a8a61ec4-d7a3-11e2-8c28-000c29c27581'
self.assertEqual(len(self.store_api.data.keys()), 2)
context = glance.context.RequestContext(user=USER1)
(image1, image_stub1) = self._add_image(context, UUID2, 'XXXX', 4)
(image2, image_stub2) = self._add_image(context, UUID3, 'YYYY', 4)
location2 = {'url': UUID2, 'metadata': {}}
location3 = {'url': UUID3, 'metadata': {}}
image_stub3 = ImageStub('fake_image_id', status='queued', locations=[])
image3 = glance.store.ImageProxy(image_stub3, context, self.store_api)
image3.locations += [location2, location3]
image_stub3.locations.reverse()
self.assertEqual(image_stub3.locations, [location3, location2])
self.assertEqual(image3.locations, [location3, location2])
image3.delete()
self.assertEqual(len(self.store_api.data.keys()), 2)
self.assertFalse(UUID2 in self.store_api.data.keys())
self.assertFalse(UUID3 in self.store_api.data.keys())
image1.delete()
image2.delete()
**** CubicPower OpenStack Study ****
class TestStoreImageRepo(utils.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestStoreImageRepo, self).setUp()
self.store_api = unit_test_utils.FakeStoreAPI()
self.image_stub = ImageStub(UUID1)
self.image = glance.store.ImageProxy(self.image_stub,
{}, self.store_api)
self.image_repo_stub = ImageRepoStub()
self.image_repo = glance.store.ImageRepoProxy(self.image_repo_stub,
{}, self.store_api)
**** CubicPower OpenStack Study ****
def test_add_updates_acls(self):
self.image_stub.locations = [{'url': 'foo', 'metadata': {}},
{'url': 'bar', 'metadata': {}}]
self.image_stub.visibility = 'public'
self.image_repo.add(self.image)
self.assertTrue(self.store_api.acls['foo']['public'])
self.assertEqual(self.store_api.acls['foo']['read'], [])
self.assertEqual(self.store_api.acls['foo']['write'], [])
self.assertTrue(self.store_api.acls['bar']['public'])
self.assertEqual(self.store_api.acls['bar']['read'], [])
self.assertEqual(self.store_api.acls['bar']['write'], [])
**** CubicPower OpenStack Study ****
def test_add_ignores_acls_if_no_locations(self):
self.image_stub.locations = []
self.image_stub.visibility = 'public'
self.image_repo.add(self.image)
self.assertEqual(len(self.store_api.acls), 0)
**** CubicPower OpenStack Study ****
def test_save_updates_acls(self):
self.image_stub.locations = [{'url': 'foo', 'metadata': {}}]
self.image_repo.save(self.image)
self.assertIn('foo', self.store_api.acls)
**** CubicPower OpenStack Study ****
def test_add_fetches_members_if_private(self):
self.image_stub.locations = [{'url': 'glue', 'metadata': {}}]
self.image_stub.visibility = 'private'
self.image_repo.add(self.image)
self.assertIn('glue', self.store_api.acls)
acls = self.store_api.acls['glue']
self.assertFalse(acls['public'])
self.assertEqual(acls['write'], [])
self.assertEqual(acls['read'], [TENANT1, TENANT2])
**** CubicPower OpenStack Study ****
def test_save_fetches_members_if_private(self):
self.image_stub.locations = [{'url': 'glue', 'metadata': {}}]
self.image_stub.visibility = 'private'
self.image_repo.save(self.image)
self.assertIn('glue', self.store_api.acls)
acls = self.store_api.acls['glue']
self.assertFalse(acls['public'])
self.assertEqual(acls['write'], [])
self.assertEqual(acls['read'], [TENANT1, TENANT2])
**** CubicPower OpenStack Study ****
def test_member_addition_updates_acls(self):
self.image_stub.locations = [{'url': 'glug', 'metadata': {}}]
self.image_stub.visibility = 'private'
member_repo = self.image.get_member_repo()
membership = glance.domain.ImageMembership(
UUID1, TENANT3, None, None, status='accepted')
member_repo.add(membership)
self.assertIn('glug', self.store_api.acls)
acls = self.store_api.acls['glug']
self.assertFalse(acls['public'])
self.assertEqual(acls['write'], [])
self.assertEqual(acls['read'], [TENANT1, TENANT2, TENANT3])
**** CubicPower OpenStack Study ****
def test_member_removal_updates_acls(self):
self.image_stub.locations = [{'url': 'glug', 'metadata': {}}]
self.image_stub.visibility = 'private'
member_repo = self.image.get_member_repo()
membership = glance.domain.ImageMembership(
UUID1, TENANT1, None, None, status='accepted')
member_repo.remove(membership)
self.assertIn('glug', self.store_api.acls)
acls = self.store_api.acls['glug']
self.assertFalse(acls['public'])
self.assertEqual(acls['write'], [])
self.assertEqual(acls['read'], [TENANT2])
**** CubicPower OpenStack Study ****
class TestImageFactory(utils.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestImageFactory, self).setUp()
self.image_factory = glance.store.ImageFactoryProxy(
ImageFactoryStub(),
glance.context.RequestContext(user=USER1),
unit_test_utils.FakeStoreAPI())
**** CubicPower OpenStack Study ****
def test_new_image(self):
image = self.image_factory.new_image()
self.assertTrue(image.image_id is None)
self.assertTrue(image.status is None)
self.assertEqual(image.visibility, 'private')
self.assertEqual(image.locations, [])
**** CubicPower OpenStack Study ****
def test_new_image_with_location(self):
locations = [{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}}]
image = self.image_factory.new_image(locations=locations)
self.assertEqual(image.locations, locations)
location_bad = {'url': 'unknown://location', 'metadata': {}}
self.assertRaises(exception.BadStoreUri,
self.image_factory.new_image,
locations=[location_bad])
**** CubicPower OpenStack Study ****
class TestStoreMetaDataChecker(utils.BaseTestCase):
**** CubicPower OpenStack Study ****
def test_empty(self):
glance.store.check_location_metadata({})
**** CubicPower OpenStack Study ****
def test_unicode(self):
m = {'key': u'somevalue'}
glance.store.check_location_metadata(m)
**** CubicPower OpenStack Study ****
def test_unicode_list(self):
m = {'key': [u'somevalue', u'2']}
glance.store.check_location_metadata(m)
**** CubicPower OpenStack Study ****
def test_unicode_dict(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
m = {'topkey': inner}
glance.store.check_location_metadata(m)
**** CubicPower OpenStack Study ****
def test_unicode_dict_list(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
m = {'topkey': inner, 'list': [u'somevalue', u'2'], 'u': u'2'}
glance.store.check_location_metadata(m)
**** CubicPower OpenStack Study ****
def test_nested_dict(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
inner = {'newkey': inner}
inner = {'anotherkey': inner}
m = {'topkey': inner}
glance.store.check_location_metadata(m)
**** CubicPower OpenStack Study ****
def test_simple_bad(self):
m = {'key1': object()}
self.assertRaises(glance.store.BackendException,
glance.store.check_location_metadata,
m)
**** CubicPower OpenStack Study ****
def test_list_bad(self):
m = {'key1': [u'somevalue', object()]}
self.assertRaises(glance.store.BackendException,
glance.store.check_location_metadata,
m)
**** CubicPower OpenStack Study ****
def test_nested_dict_bad(self):
inner = {'key1': u'somevalue', 'key2': object()}
inner = {'newkey': inner}
inner = {'anotherkey': inner}
m = {'topkey': inner}
self.assertRaises(glance.store.BackendException,
glance.store.check_location_metadata,
m)
**** CubicPower OpenStack Study ****
class TestStoreAddToBackend(utils.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestStoreAddToBackend, self).setUp()
self.image_id = "animage"
self.data = "dataandstuff"
self.size = len(self.data)
self.location = "file:///ab/cde/fgh"
self.checksum = "md5"
self.mox = mox.Mox()
**** CubicPower OpenStack Study ****
def tearDown(self):
super(TestStoreAddToBackend, self).tearDown()
self.mox.UnsetStubs()
**** CubicPower OpenStack Study ****
def _bad_metadata(self, in_metadata):
store = self.mox.CreateMockAnything()
store.add(self.image_id, mox.IgnoreArg(), self.size).AndReturn(
(self.location, self.size, self.checksum, in_metadata))
store.__str__ = lambda: "hello"
store.__unicode__ = lambda: "hello"
self.mox.ReplayAll()
self.assertRaises(glance.store.BackendException,
glance.store.store_add_to_backend,
self.image_id,
self.data,
self.size,
store)
self.mox.VerifyAll()
**** CubicPower OpenStack Study ****
def _good_metadata(self, in_metadata):
store = self.mox.CreateMockAnything()
store.add(self.image_id, mox.IgnoreArg(), self.size).AndReturn(
(self.location, self.size, self.checksum, in_metadata))
self.mox.ReplayAll()
(location,
size,
checksum,
metadata) = glance.store.store_add_to_backend(self.image_id,
self.data,
self.size,
store)
self.mox.VerifyAll()
self.assertEqual(self.location, location)
self.assertEqual(self.size, size)
self.assertEqual(self.checksum, checksum)
self.assertEqual(in_metadata, metadata)
**** CubicPower OpenStack Study ****
def test_empty(self):
metadata = {}
self._good_metadata(metadata)
**** CubicPower OpenStack Study ****
def test_string(self):
metadata = {'key': u'somevalue'}
self._good_metadata(metadata)
**** CubicPower OpenStack Study ****
def test_list(self):
m = {'key': [u'somevalue', u'2']}
self._good_metadata(m)
**** CubicPower OpenStack Study ****
def test_unicode_dict(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
m = {'topkey': inner}
self._good_metadata(m)
**** CubicPower OpenStack Study ****
def test_unicode_dict_list(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
m = {'topkey': inner, 'list': [u'somevalue', u'2'], 'u': u'2'}
self._good_metadata(m)
**** CubicPower OpenStack Study ****
def test_nested_dict(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue'}
inner = {'newkey': inner}
inner = {'anotherkey': inner}
m = {'topkey': inner}
self._good_metadata(m)
**** CubicPower OpenStack Study ****
def test_bad_top_level_nonunicode(self):
metadata = {'key': 'a string'}
self._bad_metadata(metadata)
**** CubicPower OpenStack Study ****
def test_bad_nonunicode_dict_list(self):
inner = {'key1': u'somevalue', 'key2': u'somevalue',
'k3': [1, object()]}
m = {'topkey': inner, 'list': [u'somevalue', u'2'], 'u': u'2'}
self._bad_metadata(m)
**** CubicPower OpenStack Study ****
def test_bad_metadata_not_dict(self):
store = self.mox.CreateMockAnything()
store.add(self.image_id, mox.IgnoreArg(), self.size).AndReturn(
(self.location, self.size, self.checksum, []))
store.__str__ = lambda: "hello"
store.__unicode__ = lambda: "hello"
self.mox.ReplayAll()
self.assertRaises(glance.store.BackendException,
glance.store.store_add_to_backend,
self.image_id,
self.data,
self.size,
store)
self.mox.VerifyAll()