**** CubicPower OpenStack Study ****
# Copyright 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo.config import cfg
import webob
from cinder.api import extensions
from cinder.api.v2 import volume_metadata
from cinder.api.v2 import volumes
from cinder import db
from cinder import exception
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
from cinder.tests.api.v2 import stubs
from cinder.volume import api as volume_api
CONF = cfg.CONF
**** CubicPower OpenStack Study ****
def return_create_volume_metadata_max(context, volume_id, metadata, delete):
return stub_max_volume_metadata()
**** CubicPower OpenStack Study ****
def return_create_volume_metadata(context, volume_id, metadata, delete):
return stub_volume_metadata()
**** CubicPower OpenStack Study ****
def return_new_volume_metadata(context, volume_id, metadata, delete):
return stub_new_volume_metadata()
**** CubicPower OpenStack Study ****
def return_create_volume_metadata_insensitive(context, snapshot_id,
metadata, delete):
return stub_volume_metadata_insensitive()
**** CubicPower OpenStack Study ****
def return_volume_metadata(context, volume_id):
if not isinstance(volume_id, str) or not len(volume_id) == 36:
msg = 'id %s must be a uuid in return volume metadata' % volume_id
raise Exception(msg)
return stub_volume_metadata()
**** CubicPower OpenStack Study ****
def return_empty_volume_metadata(context, volume_id):
return {}
**** CubicPower OpenStack Study ****
def return_empty_container_metadata(context, volume_id, metadata, delete):
return {}
**** CubicPower OpenStack Study ****
def delete_volume_metadata(context, volume_id, key):
pass
**** CubicPower OpenStack Study ****
def stub_volume_metadata():
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
return metadata
**** CubicPower OpenStack Study ****
def stub_new_volume_metadata():
metadata = {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
}
return metadata
**** CubicPower OpenStack Study ****
def stub_volume_metadata_insensitive():
metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4",
}
return metadata
**** CubicPower OpenStack Study ****
def stub_max_volume_metadata():
metadata = {"metadata": {}}
for num in range(CONF.quota_metadata_items):
metadata['metadata']['key%i' % num] = "blah"
return metadata
**** CubicPower OpenStack Study ****
def return_volume(context, volume_id):
return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',
'name': 'fake',
'metadata': {}}
**** CubicPower OpenStack Study ****
def return_volume_nonexistent(context, volume_id):
raise exception.VolumeNotFound('bogus test message')
**** CubicPower OpenStack Study ****
def fake_update_volume_metadata(self, context, volume, diff):
pass
**** CubicPower OpenStack Study ****
class volumeMetaDataTest(test.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(volumeMetaDataTest, self).setUp()
self.volume_api = volume_api.API()
fakes.stub_out_key_pair_funcs(self.stubs)
self.stubs.Set(db, 'volume_get', return_volume)
self.stubs.Set(db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(db, 'service_get_all_by_topic',
stubs.stub_service_get_all_by_topic)
self.stubs.Set(self.volume_api, 'update_volume_metadata',
fake_update_volume_metadata)
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}
self.volume_controller = volumes.VolumeController(self.ext_mgr)
self.controller = volume_metadata.Controller()
self.req_id = str(uuid.uuid4())
self.url = '/v2/fake/volumes/%s/metadata' % self.req_id
vol = {"size": 100,
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
"availability_zone": "zone1:host1",
"metadata": {}}
body = {"volume": vol}
req = fakes.HTTPRequest.blank('/v2/volumes')
self.volume_controller.create(req, body)
**** CubicPower OpenStack Study ****
def test_index(self):
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
expected = {
'metadata': {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
},
}
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_index_nonexistent_volume(self):
self.stubs.Set(db, 'volume_metadata_get',
return_volume_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.index, req, self.url)
**** CubicPower OpenStack Study ****
def test_index_no_data(self):
self.stubs.Set(db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
res_dict = self.controller.index(req, self.req_id)
expected = {'metadata': {}}
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_show(self):
req = fakes.HTTPRequest.blank(self.url + '/key2')
res_dict = self.controller.show(req, self.req_id, 'key2')
expected = {'meta': {'key2': 'value2'}}
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_show_nonexistent_volume(self):
self.stubs.Set(db, 'volume_metadata_get',
return_volume_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key2')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key2')
**** CubicPower OpenStack Study ****
def test_show_meta_not_found(self):
self.stubs.Set(db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, self.req_id, 'key6')
**** CubicPower OpenStack Study ****
def test_delete(self):
self.stubs.Set(db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(db, 'volume_metadata_delete',
delete_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key2')
req.method = 'DELETE'
res = self.controller.delete(req, self.req_id, 'key2')
self.assertEqual(200, res.status_int)
**** CubicPower OpenStack Study ****
def test_delete_nonexistent_volume(self):
self.stubs.Set(db, 'volume_get',
return_volume_nonexistent)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key1')
**** CubicPower OpenStack Study ****
def test_delete_meta_not_found(self):
self.stubs.Set(db, 'volume_metadata_get',
return_empty_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key6')
req.method = 'DELETE'
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, req, self.req_id, 'key6')
**** CubicPower OpenStack Study ****
def test_create(self):
self.stubs.Set(db, 'volume_metadata_get',
return_empty_volume_metadata)
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank('/v2/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3", }}
req.body = jsonutils.dumps(body)
res_dict = self.controller.create(req, self.req_id, body)
self.assertEqual(body, res_dict)
**** CubicPower OpenStack Study ****
def test_create_with_keys_in_uppercase_and_lowercase(self):
# if the keys in uppercase_and_lowercase, should return the one
# which server added
self.stubs.Set(db, 'volume_metadata_get',
return_empty_volume_metadata)
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata_insensitive)
req = fakes.HTTPRequest.blank('/v2/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key1": "value1",
"KEY1": "value1",
"key2": "value2",
"KEY2": "value2",
"key3": "value3",
"KEY4": "value4"}}
expected = {"metadata": {"key1": "value1",
"key2": "value2",
"key3": "value3",
"KEY4": "value4"}}
req.body = jsonutils.dumps(body)
res_dict = self.controller.create(req, self.req_id, body)
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_create_empty_body(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.req_id, None)
**** CubicPower OpenStack Study ****
def test_create_item_empty_key(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.req_id, body)
**** CubicPower OpenStack Study ****
def test_create_item_key_too_long(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create,
req, self.req_id, body)
**** CubicPower OpenStack Study ****
def test_create_nonexistent_volume(self):
self.stubs.Set(db, 'volume_get',
return_volume_nonexistent)
self.stubs.Set(db, 'volume_metadata_get',
return_volume_metadata)
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank('/v2/volume_metadata')
req.method = 'POST'
req.content_type = "application/json"
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.create, req, self.req_id, body)
**** CubicPower OpenStack Study ****
def test_update_all(self):
self.stubs.Set(db, 'volume_metadata_update',
return_new_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.req_id, expected)
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_update_all_with_keys_in_uppercase_and_lowercase(self):
self.stubs.Set(db, 'volume_metadata_get',
return_create_volume_metadata)
self.stubs.Set(db, 'volume_metadata_update',
return_new_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {
'metadata': {
'key10': 'value10',
'KEY10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
expected = {
'metadata': {
'key10': 'value10',
'key99': 'value99',
'KEY20': 'value20',
},
}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.req_id, body)
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_update_all_empty_container(self):
self.stubs.Set(db, 'volume_metadata_update',
return_empty_container_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': {}}
req.body = jsonutils.dumps(expected)
res_dict = self.controller.update_all(req, self.req_id, expected)
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_update_all_malformed_container(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'meta': {}}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.req_id,
expected)
**** CubicPower OpenStack Study ****
def test_update_all_malformed_data(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
expected = {'metadata': ['asdf']}
req.body = jsonutils.dumps(expected)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update_all, req, self.req_id,
expected)
**** CubicPower OpenStack Study ****
def test_update_all_nonexistent_volume(self):
self.stubs.Set(db, 'volume_get', return_volume_nonexistent)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'PUT'
req.content_type = "application/json"
body = {'metadata': {'key10': 'value10'}}
req.body = jsonutils.dumps(body)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update_all, req, '100', body)
**** CubicPower OpenStack Study ****
def test_update_item(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
res_dict = self.controller.update(req, self.req_id, 'key1', body)
expected = {'meta': {'key1': 'value1'}}
self.assertEqual(expected, res_dict)
**** CubicPower OpenStack Study ****
def test_update_item_nonexistent_volume(self):
self.stubs.Set(db, 'volume_get',
return_volume_nonexistent)
req = fakes.HTTPRequest.blank('/v2/fake/volumes/asdf/metadata/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.update, req, self.req_id, 'key1',
body)
**** CubicPower OpenStack Study ****
def test_update_item_empty_body(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, 'key1',
None)
**** CubicPower OpenStack Study ****
def test_update_item_empty_key(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, '', body)
**** CubicPower OpenStack Study ****
def test_update_item_key_too_long(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {("a" * 260): "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.req_id, ("a" * 260), body)
**** CubicPower OpenStack Study ****
def test_update_item_value_too_long(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": ("a" * 260)}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.update,
req, self.req_id, "key1", body)
**** CubicPower OpenStack Study ****
def test_update_item_too_many_keys(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/key1')
req.method = 'PUT'
body = {"meta": {"key1": "value1", "key2": "value2"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, 'key1',
body)
**** CubicPower OpenStack Study ****
def test_update_item_body_uri_mismatch(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url + '/bad')
req.method = 'PUT'
body = {"meta": {"key1": "value1"}}
req.body = jsonutils.dumps(body)
req.headers["content-type"] = "application/json"
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.update, req, self.req_id, 'bad',
body)
**** CubicPower OpenStack Study ****
def test_invalid_metadata_items_on_create(self):
self.stubs.Set(db, 'volume_metadata_update',
return_create_volume_metadata)
req = fakes.HTTPRequest.blank(self.url)
req.method = 'POST'
req.headers["content-type"] = "application/json"
#test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
#test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
#test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.create, req, self.req_id, data)