**** CubicPower OpenStack Study ****
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import os
import mock
from oslo.config import cfg
from testtools import matchers
from testtools import testcase
import webob.exc
import neutron
from neutron.api import api_common
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.api.v2 import attributes
from neutron.api.v2.attributes import ATTR_NOT_SPECIFIED
from neutron.api.v2.router import APIRouter
from neutron.common import config
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common.test_lib import test_config
from neutron import context
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron.manager import NeutronManager
from neutron.openstack.common import importutils
from neutron.tests import base
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
**** CubicPower OpenStack Study ****
def optional_ctx(obj, fallback):
if not obj:
return fallback()
@contextlib.contextmanager
def context_wrapper():
yield obj
return context_wrapper()
**** CubicPower OpenStack Study ****
def context_wrapper():
yield obj
return context_wrapper()
**** CubicPower OpenStack Study ****
def etcdir(*p):
return os.path.join(ETCDIR, *p)
**** CubicPower OpenStack Study ****
def _fake_get_pagination_helper(self, request):
return api_common.PaginationEmulatedHelper(request, self._primary_key)
**** CubicPower OpenStack Study ****
def _fake_get_sorting_helper(self, request):
return api_common.SortingEmulatedHelper(request, self._attr_info)
**** CubicPower OpenStack Study ****
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
**** CubicPower OpenStack Study ****
def setUp(self, plugin=None, service_plugins=None,
ext_mgr=None):
super(NeutronDbPluginV2TestCase, self).setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
# Make sure at each test according extensions for the plugin is loaded
PluginAwareExtensionManager._instance = None
# Save the attributes map in case the plugin will alter it
# loading extensions
# Note(salvatore-orlando): shallow copy is not good enough in
# this case, but copy.deepcopy does not seem to work, since it
# causes test failures
self._attribute_map_bk = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
self._tenant_id = 'test-tenant'
if not plugin:
plugin = DB_PLUGIN_KLASS
# Create the default configurations
args = ['--config-file', etcdir('neutron.conf.test')]
# If test_config specifies some config-file, use it, as well
for config_file in test_config.get('config_files', []):
args.extend(['--config-file', config_file])
config.parse(args=args)
# Update the plugin
self.setup_coreplugin(plugin)
cfg.CONF.set_override(
'service_plugins',
[test_config.get(key, default)
for key, default in (service_plugins or {}).iteritems()]
)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
# Set the defualt status
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
def _is_native_bulk_supported():
plugin_obj = NeutronManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
def _is_native_pagination_support():
native_pagination_attr_name = (
"_%s__native_pagination_support" %
NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_pagination and
getattr(NeutronManager.get_plugin(),
native_pagination_attr_name, False))
self._skip_native_pagination = not _is_native_pagination_support()
def _is_native_sorting_support():
native_sorting_attr_name = (
"_%s__native_sorting_support" %
NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_sorting and
getattr(NeutronManager.get_plugin(),
native_sorting_attr_name, False))
self._skip_native_sorting = not _is_native_sorting_support()
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
**** CubicPower OpenStack Study ****
def _is_native_bulk_supported():
plugin_obj = NeutronManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
**** CubicPower OpenStack Study ****
def _is_native_pagination_support():
native_pagination_attr_name = (
"_%s__native_pagination_support" %
NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_pagination and
getattr(NeutronManager.get_plugin(),
native_pagination_attr_name, False))
self._skip_native_pagination = not _is_native_pagination_support()
**** CubicPower OpenStack Study ****
def _is_native_sorting_support():
native_sorting_attr_name = (
"_%s__native_sorting_support" %
NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_sorting and
getattr(NeutronManager.get_plugin(),
native_sorting_attr_name, False))
self._skip_native_sorting = not _is_native_sorting_support()
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
**** CubicPower OpenStack Study ****
def tearDown(self):
self.api = None
self._deserializers = None
self._skip_native_bulk = None
self._skip_native_pagination = None
self._skip_native_sortin = None
self.ext_api = None
# NOTE(jkoelker) for a 'pluggable' framework, Neutron sure
# doesn't like when the plugin changes ;)
db.clear_db()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
super(NeutronDbPluginV2TestCase, self).tearDown()
**** CubicPower OpenStack Study ****
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None):
fmt = fmt or self.fmt
path = '/%s.%s' % (
'/'.join(p for p in
(resource, id, subresource, sub_id, action) if p),
fmt
)
prefix = self.resource_prefix_map.get(resource)
if prefix:
path = prefix + path
content_type = 'application/%s' % fmt
body = None
if data is not None: # empty dict is valid
body = self.serialize(data)
return testlib_api.create_request(path, body, content_type, method,
query_string=params)
**** CubicPower OpenStack Study ****
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None):
return self._req('POST', resource, data, fmt, id=id,
subresource=subresource)
**** CubicPower OpenStack Study ****
def new_list_request(self, resource, fmt=None, params=None,
subresource=None):
return self._req(
'GET', resource, None, fmt, params=params, subresource=subresource
)
**** CubicPower OpenStack Study ****
def new_show_request(self, resource, id, fmt=None,
subresource=None, fields=None):
if fields:
params = "&".join(["fields=%s" % x for x in fields])
else:
params = None
return self._req('GET', resource, None, fmt, id=id,
params=params, subresource=subresource)
**** CubicPower OpenStack Study ****
def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None):
return self._req(
'DELETE',
resource,
None,
fmt,
id=id,
subresource=subresource,
sub_id=sub_id
)
**** CubicPower OpenStack Study ****
def new_update_request(self, resource, data, id, fmt=None,
subresource=None):
return self._req(
'PUT', resource, data, fmt, id=id, subresource=subresource
)
**** CubicPower OpenStack Study ****
def new_action_request(self, resource, data, id, action, fmt=None,
subresource=None):
return self._req(
'PUT',
resource,
data,
fmt,
id=id,
action=action,
subresource=subresource
)
**** CubicPower OpenStack Study ****
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
**** CubicPower OpenStack Study ****
def _create_bulk_from_list(self, fmt, resource, objects, **kwargs):
"""Creates a bulk request from a list of objects."""
collection = "%ss" % resource
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
**** CubicPower OpenStack Study ****
def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
"""Creates a bulk request for any kind of resource."""
objects = []
collection = "%ss" % resource
for i in range(number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "%s_%s" % (name, i)
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
**** CubicPower OpenStack Study ****
def _create_network(self, fmt, name, admin_state_up,
arg_list=None, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
for arg in (('admin_state_up', 'tenant_id', 'shared') +
(arg_list or ())):
# Arg must be present
if arg in kwargs:
data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
network_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
return network_req.get_response(self.api)
**** CubicPower OpenStack Study ****
def _create_network_bulk(self, fmt, number, name,
admin_state_up, **kwargs):
base_data = {'network': {'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
**** CubicPower OpenStack Study ****
def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': 4,
'tenant_id': self._tenant_id}}
for arg in ('ip_version', 'tenant_id',
'enable_dhcp', 'allocation_pools',
'dns_nameservers', 'host_routes',
'shared', 'ipv6_ra_mode', 'ipv6_address_mode'):
# Arg must be present and not null (but can be false)
if arg in kwargs and kwargs[arg] is not None:
data['subnet'][arg] = kwargs[arg]
if ('gateway_ip' in kwargs and
kwargs['gateway_ip'] is not ATTR_NOT_SPECIFIED):
data['subnet']['gateway_ip'] = kwargs['gateway_ip']
subnet_req = self.new_create_request('subnets', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
subnet_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res
**** CubicPower OpenStack Study ****
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
**** CubicPower OpenStack Study ****
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
for arg in (('admin_state_up', 'device_id',
'mac_address', 'name', 'fixed_ips',
'tenant_id', 'device_owner', 'security_groups') +
(arg_list or ())):
# Arg must be present
if arg in kwargs:
data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
**** CubicPower OpenStack Study ****
def _list_ports(self, fmt, expected_res_status=None,
net_id=None, **kwargs):
query_params = None
if net_id:
query_params = "network_id=%s" % net_id
port_req = self.new_list_request('ports', fmt, query_params)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
**** CubicPower OpenStack Study ****
def _create_port_bulk(self, fmt, number, net_id, name,
admin_state_up, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
**** CubicPower OpenStack Study ****
def _make_network(self, fmt, name, admin_state_up, **kwargs):
res = self._create_network(fmt, name, admin_state_up, **kwargs)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
**** CubicPower OpenStack Study ****
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
ipv6_ra_mode=None, ipv6_address_mode=None):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
gateway_ip=gateway,
tenant_id=network['network']['tenant_id'],
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
dns_nameservers=dns_nameservers,
host_routes=host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
**** CubicPower OpenStack Study ****
def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs):
res = self._create_port(fmt, net_id, expected_res_status, **kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
**** CubicPower OpenStack Study ****
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports']:
return self.api
else:
return self.ext_api
**** CubicPower OpenStack Study ****
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code,
neutron_context=None):
req = self.new_delete_request(collection, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
**** CubicPower OpenStack Study ****
def _show(self, resource, id,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None):
req = self.new_show_request(resource, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize(self.fmt, res)
**** CubicPower OpenStack Study ****
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None):
req = self.new_update_request(resource, new_data, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize(self.fmt, res)
**** CubicPower OpenStack Study ****
def _list(self, resource, fmt=None, neutron_context=None,
query_params=None):
fmt = fmt or self.fmt
req = self.new_list_request(resource, fmt, query_params)
if neutron_context:
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(fmt, res)
**** CubicPower OpenStack Study ****
def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs):
raise n_exc.NeutronException()
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
**** CubicPower OpenStack Study ****
def second_call(*args, **kwargs):
raise n_exc.NeutronException()
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
**** CubicPower OpenStack Study ****
def _validate_behavior_on_bulk_failure(
self, res, collection,
errcode=webob.exc.HTTPClientError.code):
self.assertEqual(res.status_int, errcode)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
items = self.deserialize(self.fmt, res)
self.assertEqual(len(items[collection]), 0)
**** CubicPower OpenStack Study ****
def _validate_behavior_on_bulk_success(self, res, collection,
names=['test_0', 'test_1']):
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
items = self.deserialize(self.fmt, res)[collection]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
**** CubicPower OpenStack Study ****
def _test_list_resources(self, resource, items, neutron_context=None,
query_params=None):
res = self._list('%ss' % resource,
neutron_context=neutron_context,
query_params=query_params)
resource = resource.replace('-', '_')
self.assertEqual(sorted([i['id'] for i in res['%ss' % resource]]),
sorted([i[resource]['id'] for i in items]))
@contextlib.contextmanager
**** CubicPower OpenStack Study ****
def network(self, name='net1',
admin_state_up=True,
fmt=None,
do_delete=True,
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
yield network
if do_delete:
# The do_delete parameter allows you to control whether the
# created network is immediately deleted again. Therefore, this
# function is also usable in tests, which require the creation
# of many networks.
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
**** CubicPower OpenStack Study ****
def subnet(self, network=None,
gateway_ip=ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
fmt=None,
ip_version=4,
allocation_pools=None,
enable_dhcp=True,
dns_nameservers=None,
host_routes=None,
shared=None,
do_delete=True,
ipv6_ra_mode=None,
ipv6_address_mode=None):
with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
gateway_ip,
cidr,
allocation_pools,
ip_version,
enable_dhcp,
dns_nameservers,
host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
yield subnet
if do_delete:
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
**** CubicPower OpenStack Study ****
def port(self, subnet=None, fmt=None, no_delete=False,
**kwargs):
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
yield port
if not no_delete:
self._delete('ports', port['port']['id'])
**** CubicPower OpenStack Study ****
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
if not resources:
resources = '%ss' % resource
req = self.new_list_request(resources,
params=query_str)
api = self._api_for_resource(resources)
res = self.deserialize(self.fmt, req.get_response(api))
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
expected_res = [item[resource]['id'] for item in items]
self.assertEqual(sorted([n['id'] for n in res[resources]]),
sorted(expected_res))
**** CubicPower OpenStack Study ****
def _test_list_with_pagination(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params='',
verify_key='id'):
if not resources:
resources = '%ss' % resource
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request(resources, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource(resources)
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
items_res = items_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'next':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(page_num, expected_page_num)
self.assertEqual(sorted([n[verify_key] for n in items_res]),
sorted([item[resource][verify_key]
for item in items]))
**** CubicPower OpenStack Study ****
def _test_list_with_pagination_reverse(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params=''):
if not resources:
resources = '%ss' % resource
resource = resource.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][resource]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][resource]]
page_num = 0
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
res[resources].reverse()
item_res = item_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'previous':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(page_num, expected_page_num)
expected_res = [item[resource]['id'] for item in items]
expected_res.reverse()
self.assertEqual(sorted([n['id'] for n in item_res]),
sorted(expected_res))
**** CubicPower OpenStack Study ****
class TestBasicGet(NeutronDbPluginV2TestCase):
**** CubicPower OpenStack Study ****
def test_single_get_admin(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
**** CubicPower OpenStack Study ****
def test_single_get_tenant(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
**** CubicPower OpenStack Study ****
class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
**** CubicPower OpenStack Study ****
def test_create_returns_201(self):
res = self._create_network(self.fmt, 'net2', True)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
**** CubicPower OpenStack Study ****
def test_list_returns_200(self):
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
**** CubicPower OpenStack Study ****
def _check_list_with_fields(self, res, field_name):
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
body = self.deserialize(self.fmt, res)
# further checks: 1 networks
self.assertEqual(len(body['networks']), 1)
# 1 field in the network record
self.assertEqual(len(body['networks'][0]), 1)
# field is 'name'
self.assertIn(field_name, body['networks'][0])
**** CubicPower OpenStack Study ****
def test_list_with_fields(self):
self._create_network(self.fmt, 'some_net', True)
req = self.new_list_request('networks', params="fields=name")
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
**** CubicPower OpenStack Study ****
def test_list_with_fields_noadmin(self):
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=name")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
**** CubicPower OpenStack Study ****
def test_list_with_fields_noadmin_and_policy_field(self):
"""If a field used by policy is selected, do not duplicate it.
Verifies that if the field parameter explicitly specifies a field
which is used by the policy engine, then it is not duplicated
in the response.
"""
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=tenant_id")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'tenant_id')
**** CubicPower OpenStack Study ****
def test_show_returns_200(self):
with self.network() as net:
req = self.new_show_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
**** CubicPower OpenStack Study ****
def test_delete_returns_204(self):
res = self._create_network(self.fmt, 'net1', True)
net = self.deserialize(self.fmt, res)
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_update_returns_200(self):
with self.network() as net:
req = self.new_update_request('networks',
{'network': {'name': 'steve'}},
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
**** CubicPower OpenStack Study ****
def test_update_invalid_json_400(self):
with self.network() as net:
req = self.new_update_request('networks',
'{{"name": "aaa"}}',
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_bad_route_404(self):
req = self.new_list_request('doohickeys')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
class TestPortsV2(NeutronDbPluginV2TestCase):
**** CubicPower OpenStack Study ****
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual('myname', port['port']['name'])
**** CubicPower OpenStack Study ****
def test_create_port_as_admin(self):
with self.network(do_delete=False) as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=False)
**** CubicPower OpenStack Study ****
def test_create_port_bad_tenant(self):
with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPNotFound.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=True)
**** CubicPower OpenStack Study ****
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
**** CubicPower OpenStack Study ****
def test_create_port_public_network_with_ip(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
keys = [('admin_state_up', True),
('status', self.port_create_status),
('fixed_ips', [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}])]
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
**** CubicPower OpenStack Study ****
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
**** CubicPower OpenStack Study ****
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
**** CubicPower OpenStack Study ****
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
overrides = {1: {'admin_state_up': 'doh'}}
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True,
override=overrides)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
req = self.new_list_request('ports')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
ports = self.deserialize(self.fmt, res)
self.assertEqual(len(ports['ports']), 0)
**** CubicPower OpenStack Study ****
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = NeutronManager.get_plugin().create_port
with mock.patch.object(NeutronManager.get_plugin(),
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = NeutronManager.get_plugin().create_port
with mock.patch.object(NeutronManager.get_plugin(),
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
orig = NeutronManager._instance.plugin.create_port
with mock.patch.object(NeutronManager._instance.plugin,
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code)
**** CubicPower OpenStack Study ****
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(),
self.port(),
self.port()) as ports:
self._test_list_resources('port', ports)
**** CubicPower OpenStack Study ****
def test_list_ports_filtered_by_fixed_ip(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(), self.port()) as (port1, port2):
fixed_ips = port1['port']['fixed_ips'][0]
query_params = """
fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
""".strip() % (fixed_ips['ip_address'],
'192.168.126.5',
fixed_ips['subnet_id'])
self._test_list_resources('port', [port1],
query_params=query_params)
**** CubicPower OpenStack Study ****
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
with self.subnet(network) as subnet:
with contextlib.nested(self.port(subnet, tenant_id='tenant_1'),
self.port(subnet, tenant_id='tenant_2')
) as (port1, port2):
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
q_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
neutron_context=q_context)
# Tenant_2 request - must return single port
q_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
neutron_context=q_context)
**** CubicPower OpenStack Study ****
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
**** CubicPower OpenStack Study ****
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02'),
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
**** CubicPower OpenStack Study ****
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_ports_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
**** CubicPower OpenStack Study ****
def test_list_ports_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
cfg.CONF.set_default('allow_overlapping_ips', True)
with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
self.port(mac_address='00:00:00:00:00:02'),
self.port(mac_address='00:00:00:00:00:03')
) as (port1, port2, port3):
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], self.fmt)
sport = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port']['id'], sport['port']['id'])
**** CubicPower OpenStack Study ****
def test_delete_port(self):
with self.port(no_delete=True) as port:
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
def test_update_port(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
**** CubicPower OpenStack Study ****
def test_update_port_not_admin(self):
res = self._create_network(self.fmt, 'net1', True,
tenant_id='not_admin',
set_context=True)
net1 = self.deserialize(self.fmt, res)
res = self._create_port(self.fmt, net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize(self.fmt, res)
data = {'port': {'admin_state_up': False}}
neutron_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
neutron_context=neutron_context)
self.assertEqual(port['port']['admin_state_up'], False)
**** CubicPower OpenStack Study ****
def test_update_device_id_null(self):
with self.port() as port:
data = {'port': {'device_id': None}}
req = self.new_update_request('ports', data, port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_delete_network_if_port_exists(self):
with self.port() as port:
req = self.new_delete_request('networks',
port['port']['network_id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_delete_network_port_exists_owned_by_network(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_update_port_delete_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(res['port']['fixed_ips'],
data['port']['fixed_ips'])
**** CubicPower OpenStack Study ****
def test_no_more_port_exception(self):
with self.subnet(cidr='10.0.0.0/32') as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
msg = str(n_exc.IpAddressGenerationFailure(net_id=id))
self.assertEqual(data['NeutronError']['message'], msg)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.10')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
**** CubicPower OpenStack Study ****
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': "10.0.0.2"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEqual(ips[1]['ip_address'], '10.0.0.10')
self.assertEqual(ips[1]['subnet_id'], subnet['subnet']['id'])
**** CubicPower OpenStack Study ****
def test_update_port_update_ips(self):
"""Update IP and associate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
**** CubicPower OpenStack Study ****
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEqual(ips[1]['ip_address'], '10.0.0.4')
self.assertEqual(ips[1]['subnet_id'], subnet['subnet']['id'])
**** CubicPower OpenStack Study ****
def test_requested_duplicate_mac(self):
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac[0:2]
self.assertTrue(mac.startswith(base_mac))
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56"))
**** CubicPower OpenStack Study ****
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56:78"))
**** CubicPower OpenStack Study ****
def test_bad_mac_format(self):
cfg.CONF.set_override('base_mac', "bad_mac")
try:
self.plugin._check_base_mac_format()
except Exception:
return
self.fail("No exception for illegal base_mac format")
**** CubicPower OpenStack Study ****
def test_mac_exhaustion(self):
# rather than actually consuming all MAC (would take a LONG time)
# we just raise the exception that would result.
@staticmethod
def fake_gen_mac(context, net_id):
raise n_exc.MacAddressGenerationFailure(net_id=net_id)
with mock.patch.object(neutron.db.db_base_plugin_v2.NeutronDbPluginV2,
'_generate_mac', new=fake_gen_mac):
res = self._create_network(fmt=self.fmt, name='net1',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id)
self.assertEqual(res.status_int,
webob.exc.HTTPServiceUnavailable.code)
**** CubicPower OpenStack Study ****
def fake_gen_mac(context, net_id):
raise n_exc.MacAddressGenerationFailure(net_id=net_id)
with mock.patch.object(neutron.db.db_base_plugin_v2.NeutronDbPluginV2,
'_generate_mac', new=fake_gen_mac):
res = self._create_network(fmt=self.fmt, name='net1',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
net_id = network['network']['id']
res = self._create_port(self.fmt, net_id=net_id)
self.assertEqual(res.status_int,
webob.exc.HTTPServiceUnavailable.code)
**** CubicPower OpenStack Study ****
def test_requested_duplicate_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_requested_subnet_delete(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
req = self.new_delete_request('subnet',
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
def test_requested_subnet_id(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
**** CubicPower OpenStack Study ****
def test_requested_subnet_id_not_on_network(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
# Create new network
res = self._create_network(fmt=self.fmt, name='net2',
admin_state_up=True)
network2 = self.deserialize(self.fmt, res)
subnet2 = self._make_subnet(self.fmt, network2, "1.1.1.1",
"1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
subnet2['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_overlapping_subnets(self):
with self.subnet() as subnet:
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='10.0.0.225/28',
ip_version=4,
gateway_ip=ATTR_NOT_SPECIFIED)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_requested_subnet_id_v4_and_v6(self):
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEqual(ips[1]['ip_address'], '2607:f0d0:1002:51::2')
self.assertEqual(ips[1]['subnet_id'], subnet2['subnet']['id'])
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self.assertEqual(ips[1]['ip_address'], '2607:f0d0:1002:51::3')
self.assertEqual(ips[1]['subnet_id'], subnet2['subnet']['id'])
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
**** CubicPower OpenStack Study ****
def test_range_allocation(self):
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
**** CubicPower OpenStack Study ****
def test_requested_invalid_fixed_ips(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Test invalid subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id':
'00000000-ffff-ffff-ffff-000000000000'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
# Test invalid IP address on specified subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
# Test invalid addresses - IP's not on subnet or network
# address or broadcast address
bad_ips = ['1.1.1.1', '10.0.0.0', '10.0.0.255']
net_id = port['port']['network_id']
for ip in bad_ips:
kwargs = {"fixed_ips": [{'ip_address': ip}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
# Enable allocation of gateway address
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.1'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.1')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
**** CubicPower OpenStack Study ****
def test_invalid_ip(self):
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1011.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_requested_split(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.5')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP's
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
for a in allocated:
res = self._create_port(self.fmt, net_id=net_id)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], a)
self.assertEqual(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
**** CubicPower OpenStack Study ****
def test_duplicate_ips(self):
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'},
{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_fixed_ip_invalid_subnet_id(self):
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': 'i am invalid',
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_fixed_ip_invalid_ip(self):
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.55555'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_requested_ips_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
'10.0.0.3', '10.0.0.17', '10.0.0.19']
ports_to_delete = []
for i in ips_only:
kwargs = {"fixed_ips": [{'ip_address': i}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ports_to_delete.append(port)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], i)
self.assertEqual(ips[0]['subnet_id'],
subnet['subnet']['id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
**** CubicPower OpenStack Study ****
def test_invalid_admin_state(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': network['network']['tenant_id'],
'admin_state_up': 7,
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_invalid_mac_address(self):
with self.network() as network:
data = {'port': {'network_id': network['network']['id'],
'tenant_id': network['network']['tenant_id'],
'admin_state_up': 1,
'mac_address': 'mac',
'fixed_ips': []}}
port_req = self.new_create_request('ports', data)
res = port_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_max_fixed_ips_exceeded(self):
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/24') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_max_fixed_ips_exceeded(self):
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/24') as subnet:
with self.port(subnet) as port:
data = {'port': {'fixed_ips':
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'},
{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.4'},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_delete_ports_by_device_id(self):
plugin = NeutronManager.get_plugin()
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
network_id = subnet['subnet']['network_id']
plugin.delete_ports_by_device_id(ctx, 'owner1',
network_id)
self._show('ports', p1['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p2['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p3['port']['id'],
expected_code=webob.exc.HTTPOk.code)
**** CubicPower OpenStack Study ****
def _test_delete_ports_by_device_id_second_call_failure(self, plugin):
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
orig = plugin.delete_port
with mock.patch.object(plugin, 'delete_port') as del_port:
def side_effect(*args, **kwargs):
return self._fail_second_call(del_port, orig,
*args, **kwargs)
del_port.side_effect = side_effect
network_id = subnet['subnet']['network_id']
self.assertRaises(n_exc.NeutronException,
plugin.delete_ports_by_device_id,
ctx, 'owner1', network_id)
self._show('ports', p1['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p2['port']['id'],
expected_code=webob.exc.HTTPOk.code)
self._show('ports', p3['port']['id'],
expected_code=webob.exc.HTTPOk.code)
**** CubicPower OpenStack Study ****
def test_delete_ports_by_device_id_second_call_failure(self):
plugin = NeutronManager.get_plugin()
self._test_delete_ports_by_device_id_second_call_failure(plugin)
**** CubicPower OpenStack Study ****
def _test_delete_ports_ignores_port_not_found(self, plugin):
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_id='owner1'),
mock.patch.object(plugin, 'delete_port')
) as (p, del_port):
del_port.side_effect = n_exc.PortNotFound(
port_id=p['port']['id']
)
network_id = subnet['subnet']['network_id']
try:
plugin.delete_ports_by_device_id(ctx, 'owner1',
network_id)
except n_exc.PortNotFound:
self.fail("delete_ports_by_device_id unexpectedly raised "
"a PortNotFound exception. It should ignore "
"this exception because it is often called at "
"the same time other concurrent operations are "
"deleting some of the same ports.")
**** CubicPower OpenStack Study ****
def test_delete_ports_ignores_port_not_found(self):
plugin = NeutronManager.get_plugin()
self._test_delete_ports_ignores_port_not_found(plugin)
**** CubicPower OpenStack Study ****
class TestNetworksV2(NeutronDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are
# effectively tested above
**** CubicPower OpenStack Study ****
def test_create_network(self):
name = 'net1'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', self.net_create_status), ('shared', False)]
with self.network(name=name) as net:
for k, v in keys:
self.assertEqual(net['network'][k], v)
**** CubicPower OpenStack Study ****
def test_create_public_network(self):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', self.net_create_status), ('shared', True)]
with self.network(name=name, shared=True) as net:
for k, v in keys:
self.assertEqual(net['network'][k], v)
**** CubicPower OpenStack Study ****
def test_create_public_network_no_admin_tenant(self):
name = 'public_net'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
tenant_id="another_tenant",
set_context=True):
pass
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPForbidden.code)
**** CubicPower OpenStack Study ****
def test_update_network(self):
with self.network() as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['network']['name'],
data['network']['name'])
**** CubicPower OpenStack Study ****
def test_update_shared_network_noadmin_returns_403(self):
with self.network(shared=True) as network:
data = {'network': {'name': 'a_brand_new_name'}}
req = self.new_update_request('networks',
data,
network['network']['id'])
req.environ['neutron.context'] = context.Context('', 'somebody')
res = req.get_response(self.api)
# The API layer always returns 404 on updates in place of 403
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
def test_update_network_set_shared(self):
with self.network(shared=False) as network:
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertTrue(res['network']['shared'])
**** CubicPower OpenStack Study ****
def test_update_network_set_shared_owner_returns_404(self):
with self.network(shared=False) as network:
net_owner = network['network']['tenant_id']
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
network['network']['id'])
req.environ['neutron.context'] = context.Context('u', net_owner)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
**** CubicPower OpenStack Study ****
def test_update_network_with_subnet_set_shared(self):
with self.network(shared=False) as network:
with self.subnet(network=network) as subnet:
data = {'network': {'shared': True}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertTrue(res['network']['shared'])
# must query db to see whether subnet's shared attribute
# has been updated or not
ctx = context.Context('', '', is_admin=True)
subnet_db = NeutronManager.get_plugin()._get_subnet(
ctx, subnet['subnet']['id'])
self.assertEqual(subnet_db['shared'], True)
**** CubicPower OpenStack Study ****
def test_update_network_set_not_shared_single_tenant(self):
with self.network(shared=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertFalse(res['network']['shared'])
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
**** CubicPower OpenStack Study ****
def test_update_network_set_not_shared_other_tenant_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='somebody_else',
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int,
webob.exc.HTTPConflict.code)
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
**** CubicPower OpenStack Study ****
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='somebody_else',
set_context=True)
res2 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int,
webob.exc.HTTPConflict.code)
port1 = self.deserialize(self.fmt, res1)
port2 = self.deserialize(self.fmt, res2)
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
**** CubicPower OpenStack Study ****
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
with self.network(shared=True) as network:
res1 = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='somebody_else',
set_context=True)
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.0.0/24',
webob.exc.HTTPCreated.code,
tenant_id=network['network']['tenant_id'],
set_context=True)
data = {'network': {'shared': False}}
req = self.new_update_request('networks',
data,
network['network']['id'])
self.assertEqual(req.get_response(self.api).status_int,
webob.exc.HTTPConflict.code)
port1 = self.deserialize(self.fmt, res1)
self._delete('ports', port1['port']['id'])
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
res = self._create_network_bulk(self.fmt, 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_native_quotas(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 4
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
res = self._create_network_bulk(self.fmt, quota + 1, 'test', True)
self._validate_behavior_on_bulk_failure(
res, 'networks',
errcode=webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_tenants_and_quotas(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 2
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
networks = [{'network': {'name': 'n1',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n2',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n1',
'tenant_id': 't1'}},
{'network': {'name': 'n2',
'tenant_id': 't1'}}]
res = self._create_bulk_from_list(self.fmt, 'network', networks)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_tenants_and_quotas_fail(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 2
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
networks = [{'network': {'name': 'n1',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n2',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n1',
'tenant_id': 't1'}},
{'network': {'name': 'n3',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n2',
'tenant_id': 't1'}}]
res = self._create_bulk_from_list(self.fmt, 'network', networks)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
res = self._create_network_bulk(self.fmt, 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
res = self._create_network_bulk(self.fmt, 2, 'test', True)
self._validate_behavior_on_bulk_success(res, 'networks')
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_wrong_input(self):
res = self._create_network_bulk(self.fmt, 2, 'test', True,
override={1:
{'admin_state_up': 'doh'}})
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
nets = self.deserialize(self.fmt, res)
self.assertEqual(len(nets['networks']), 0)
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
orig = NeutronManager.get_plugin().create_network
#ensures the API choose the emulation code path
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with mock.patch.object(NeutronManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'networks', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
orig = NeutronManager.get_plugin().create_network
#ensures the API choose the emulation code path
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with mock.patch.object(NeutronManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'networks', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def test_create_networks_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
orig = NeutronManager.get_plugin().create_network
with mock.patch.object(NeutronManager.get_plugin(),
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'networks', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def test_list_networks(self):
with contextlib.nested(self.network(),
self.network(),
self.network()) as networks:
self._test_list_resources('network', networks)
**** CubicPower OpenStack Study ****
def test_list_networks_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
) as (net1, net2, net3):
self._test_list_with_sort('network', (net3, net2, net1),
[('admin_state_up', 'asc'),
('name', 'desc')])
**** CubicPower OpenStack Study ****
def test_list_networks_with_sort_extended_attr_native_returns_400(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
):
req = self.new_list_request(
'networks',
params='sort_key=provider:segmentation_id&sort_dir=asc')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
**** CubicPower OpenStack Study ****
def test_list_networks_with_sort_remote_key_native_returns_400(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
):
req = self.new_list_request(
'networks', params='sort_key=subnets&sort_dir=asc')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
**** CubicPower OpenStack Study ****
def test_list_networks_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(admin_status_up=True,
name='net1'),
self.network(admin_status_up=False,
name='net2'),
self.network(admin_status_up=False,
name='net3')
) as (net1, net2, net3):
self._test_list_with_sort('network', (net3, net2, net1),
[('admin_state_up', 'asc'),
('name', 'desc')])
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_networks_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
**** CubicPower OpenStack Study ****
def test_list_networks_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_networks_without_pk_in_fields_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1',
shared=True),
self.network(name='net2',
shared=False),
self.network(name='net3',
shared=True)
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2,
query_params="fields=name",
verify_key='name')
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_networks_without_pk_in_fields_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination('network',
(net1, net2, net3),
('name', 'asc'), 2, 2,
query_params="fields=shared",
verify_key='shared')
**** CubicPower OpenStack Study ****
def test_list_networks_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination_reverse('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
**** CubicPower OpenStack Study ****
def test_list_networks_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.network(name='net1'),
self.network(name='net2'),
self.network(name='net3')
) as (net1, net2, net3):
self._test_list_with_pagination_reverse('network',
(net1, net2, net3),
('name', 'asc'), 2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_networks_with_parameters(self):
with contextlib.nested(self.network(name='net1',
admin_state_up=False),
self.network(name='net2')) as (net1, net2):
query_params = 'admin_state_up=False'
self._test_list_resources('network', [net1],
query_params=query_params)
query_params = 'admin_state_up=True'
self._test_list_resources('network', [net2],
query_params=query_params)
**** CubicPower OpenStack Study ****
def test_list_networks_with_fields(self):
with self.network(name='net1') as net1:
req = self.new_list_request('networks',
params='fields=name')
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(1, len(res['networks']))
self.assertEqual(res['networks'][0]['name'],
net1['network']['name'])
self.assertIsNone(res['networks'][0].get('id'))
**** CubicPower OpenStack Study ****
def test_list_networks_with_parameters_invalid_values(self):
with contextlib.nested(self.network(name='net1',
admin_state_up=False),
self.network(name='net2')) as (net1, net2):
req = self.new_list_request('networks',
params='admin_state_up=fake')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
**** CubicPower OpenStack Study ****
def test_list_shared_networks_with_non_admin_user(self):
with contextlib.nested(self.network(shared=False,
name='net1',
tenant_id='tenant1'),
self.network(shared=True,
name='net2',
tenant_id='another_tenant'),
self.network(shared=False,
name='net3',
tenant_id='another_tenant')
) as (net1, net2, net3):
ctx = context.Context(user_id='non_admin',
tenant_id='tenant1',
is_admin=False)
self._test_list_resources('network', (net1, net2), ctx)
**** CubicPower OpenStack Study ****
def test_show_network(self):
with self.network(name='net1') as net:
req = self.new_show_request('networks', net['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['network']['name'],
net['network']['name'])
**** CubicPower OpenStack Study ****
def test_show_network_with_subnet(self):
with self.network(name='net1') as net:
with self.subnet(net) as subnet:
req = self.new_show_request('networks', net['network']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['network']['subnets'][0],
subnet['subnet']['id'])
**** CubicPower OpenStack Study ****
def test_invalid_admin_status(self):
value = [[7, False, webob.exc.HTTPClientError.code],
[True, True, webob.exc.HTTPCreated.code],
["True", True, webob.exc.HTTPCreated.code],
["true", True, webob.exc.HTTPCreated.code],
[1, True, webob.exc.HTTPCreated.code],
["False", False, webob.exc.HTTPCreated.code],
[False, False, webob.exc.HTTPCreated.code],
["false", False, webob.exc.HTTPCreated.code],
["7", False, webob.exc.HTTPClientError.code]]
for v in value:
data = {'network': {'name': 'net',
'admin_state_up': v[0],
'tenant_id': self._tenant_id}}
network_req = self.new_create_request('networks', data)
req = network_req.get_response(self.api)
self.assertEqual(req.status_int, v[2])
if v[2] == webob.exc.HTTPCreated.code:
res = self.deserialize(self.fmt, req)
self.assertEqual(res['network']['admin_state_up'], v[1])
**** CubicPower OpenStack Study ****
class TestSubnetsV2(NeutronDbPluginV2TestCase):
**** CubicPower OpenStack Study ****
def _test_create_subnet(self, network=None, expected=None, **kwargs):
keys = kwargs.copy()
keys.setdefault('cidr', '10.0.0.0/24')
keys.setdefault('ip_version', 4)
keys.setdefault('enable_dhcp', True)
with self.subnet(network=network, **keys) as subnet:
# verify the response has each key with the correct value
for k in keys:
self.assertIn(k, subnet['subnet'])
if isinstance(keys[k], list):
self.assertEqual(sorted(subnet['subnet'][k]),
sorted(keys[k]))
else:
self.assertEqual(subnet['subnet'][k], keys[k])
# verify the configured validations are correct
if expected:
for k in expected:
self.assertIn(k, subnet['subnet'])
if isinstance(expected[k], list):
self.assertEqual(sorted(subnet['subnet'][k]),
sorted(expected[k]))
else:
self.assertEqual(subnet['subnet'][k], expected[k])
return subnet
**** CubicPower OpenStack Study ****
def test_create_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
subnet = self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr)
self.assertEqual(4, subnet['subnet']['ip_version'])
self.assertIn('name', subnet['subnet'])
**** CubicPower OpenStack Study ****
def test_create_two_subnets(self):
gateway_ips = ['10.0.0.1', '10.0.1.1']
cidrs = ['10.0.0.0/24', '10.0.1.0/24']
with self.network() as network:
with self.subnet(network=network,
gateway_ip=gateway_ips[0],
cidr=cidrs[0]):
with self.subnet(network=network,
gateway_ip=gateway_ips[1],
cidr=cidrs[1]):
net_req = self.new_show_request('networks',
network['network']['id'])
raw_res = net_req.get_response(self.api)
net_res = self.deserialize(self.fmt, raw_res)
for subnet_id in net_res['network']['subnets']:
sub_req = self.new_show_request('subnets', subnet_id)
raw_res = sub_req.get_response(self.api)
sub_res = self.deserialize(self.fmt, raw_res)
self.assertIn(sub_res['subnet']['cidr'], cidrs)
self.assertIn(sub_res['subnet']['gateway_ip'],
gateway_ips)
**** CubicPower OpenStack Study ****
def test_create_two_subnets_same_cidr_returns_400(self):
gateway_ip_1 = '10.0.0.1'
cidr_1 = '10.0.0.0/24'
gateway_ip_2 = '10.0.0.10'
cidr_2 = '10.0.0.0/24'
with self.network() as network:
with self.subnet(network=network,
gateway_ip=gateway_ip_1,
cidr=cidr_1):
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.subnet(network=network,
gateway_ip=gateway_ip_2,
cidr=cidr_2):
pass
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_V4_cidr(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0',
'ip_version': '4',
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_V6_cidr(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::',
'ip_version': '6',
'tenant_id': network['network']['tenant_id'],
'gateway_ip': 'fe80::1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_2_subnets_overlapping_cidr_allowed_returns_200(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', True)
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
**** CubicPower OpenStack Study ****
def test_create_2_subnets_overlapping_cidr_not_allowed_returns_400(self):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', False)
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnets_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2, net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
**** CubicPower OpenStack Study ****
def test_create_subnets_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
self._validate_behavior_on_bulk_success(res, 'subnets')
**** CubicPower OpenStack Study ****
def test_create_subnets_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = NeutronManager.get_plugin().create_subnet
with mock.patch.object(NeutronManager.get_plugin(),
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'subnets', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('__builtin__.hasattr',
new=fakehasattr):
orig = NeutronManager.get_plugin().create_subnet
with mock.patch.object(NeutronManager.get_plugin(),
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'subnets', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def test_create_subnets_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
orig = NeutronManager._instance.plugin.create_subnet
with mock.patch.object(NeutronManager._instance.plugin,
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'subnets', webob.exc.HTTPServerError.code
)
**** CubicPower OpenStack Study ****
def test_delete_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_delete_subnet_port_exists_owned_by_network(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4)
self._create_port(self.fmt,
network['network']['id'],
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_delete_subnet_port_exists_owned_by_other(self):
with self.subnet() as subnet:
with self.port(subnet=subnet):
id = subnet['subnet']['id']
req = self.new_delete_request('subnets', id)
res = req.get_response(self.api)
data = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
msg = str(n_exc.SubnetInUse(subnet_id=id))
self.assertEqual(data['NeutronError']['message'], msg)
**** CubicPower OpenStack Study ****
def test_delete_subnet_with_other_subnet_on_network_still_in_use(self):
with self.network() as network:
with contextlib.nested(
self.subnet(network=network),
self.subnet(network=network, cidr='10.0.1.0/24',
do_delete=False)) as (subnet1, subnet2):
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
with self.port(
subnet=subnet1,
fixed_ips=[{'subnet_id': subnet1_id}]):
req = self.new_delete_request('subnets', subnet2_id)
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_delete_network(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
self._make_subnet(self.fmt, network, gateway_ip, cidr, ip_version=4)
req = self.new_delete_request('networks', network['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_tenant(self):
with self.network() as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.2.0/24',
webob.exc.HTTPNotFound.code,
ip_version=4,
tenant_id='bad_tenant_id',
gateway_ip='10.0.2.1',
device_owner='fake_owner',
set_context=True)
**** CubicPower OpenStack Study ****
def test_create_subnet_as_admin(self):
with self.network(do_delete=False) as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.2.0/24',
webob.exc.HTTPCreated.code,
ip_version=4,
tenant_id='bad_tenant_id',
gateway_ip='10.0.2.1',
device_owner='fake_owner',
set_context=False)
**** CubicPower OpenStack Study ****
def test_create_subnet_nonzero_cidr(self):
with contextlib.nested(
self.subnet(cidr='10.129.122.5/8'),
self.subnet(cidr='11.129.122.5/15'),
self.subnet(cidr='12.129.122.5/16'),
self.subnet(cidr='13.129.122.5/18'),
self.subnet(cidr='14.129.122.5/22'),
self.subnet(cidr='15.129.122.5/24'),
self.subnet(cidr='16.129.122.5/28'),
self.subnet(cidr='17.129.122.5/32')
) as subs:
# the API should accept and correct these for users
self.assertEqual(subs[0]['subnet']['cidr'], '10.0.0.0/8')
self.assertEqual(subs[1]['subnet']['cidr'], '11.128.0.0/15')
self.assertEqual(subs[2]['subnet']['cidr'], '12.129.0.0/16')
self.assertEqual(subs[3]['subnet']['cidr'], '13.129.64.0/18')
self.assertEqual(subs[4]['subnet']['cidr'], '14.129.120.0/22')
self.assertEqual(subs[5]['subnet']['cidr'], '15.129.122.0/24')
self.assertEqual(subs[6]['subnet']['cidr'], '16.129.122.0/28')
self.assertEqual(subs[7]['subnet']['cidr'], '17.129.122.5/32')
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_ip_version(self):
with self.network() as network:
# Check bad IP version
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 'abc',
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_ip_version_null(self):
with self.network() as network:
# Check bad IP version
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': None,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_uuid(self):
with self.network() as network:
# Check invalid UUID
data = {'subnet': {'network_id': None,
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_boolean(self):
with self.network() as network:
# Check invalid boolean
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'enable_dhcp': None,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_pools(self):
with self.network() as network:
# Check allocation pools
allocation_pools = [[{'end': '10.0.0.254'}],
[{'start': '10.0.0.254'}],
[{'start': '1000.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254'},
{'end': '10.0.0.254'}],
None,
[{'start': '10.0.0.2', 'end': '10.0.0.3'},
{'start': '10.0.0.2', 'end': '10.0.0.3'}]]
tenant_id = network['network']['tenant_id']
for pool in allocation_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'allocation_pools': pool}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_nameserver(self):
with self.network() as network:
# Check nameservers
nameserver_pools = [['1100.0.0.2'],
['1.1.1.2', '1.1000.1.3'],
['1.1.1.2', '1.1.1.2']]
tenant_id = network['network']['tenant_id']
for nameservers in nameserver_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'dns_nameservers': nameservers}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_bad_hostroutes(self):
with self.network() as network:
# Check hostroutes
hostroute_pools = [[{'destination': '100.0.0.0/24'}],
[{'nexthop': '10.0.2.20'}],
[{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}]]
tenant_id = network['network']['tenant_id']
for hostroutes in hostroute_pools:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': '4',
'tenant_id': tenant_id,
'gateway_ip': '10.0.2.1',
'host_routes': hostroutes}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_defaults(self):
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.254'}]
enable_dhcp = True
subnet = self._test_create_subnet()
# verify cidr & gw have been correctly generated
self.assertEqual(subnet['subnet']['cidr'], cidr)
self.assertEqual(subnet['subnet']['gateway_ip'], gateway)
self.assertEqual(subnet['subnet']['enable_dhcp'], enable_dhcp)
self.assertEqual(subnet['subnet']['allocation_pools'],
allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_gw_values(self):
# Gateway not in subnet
gateway = '100.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.254'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is last IP in range
gateway = '10.0.0.254'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.253'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is first in subnet
gateway = '10.0.0.1'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.254'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self._test_create_subnet(expected=expected,
gateway_ip=gateway)
**** CubicPower OpenStack Study ****
def test_create_subnet_gw_outside_cidr_force_on_returns_400(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
with self.network() as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.0.0/24',
webob.exc.HTTPClientError.code,
gateway_ip='100.0.0.1')
**** CubicPower OpenStack Study ****
def test_create_subnet_gw_of_network_force_on_returns_400(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
with self.network() as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.0.0/24',
webob.exc.HTTPClientError.code,
gateway_ip='10.0.0.0')
**** CubicPower OpenStack Study ****
def test_create_subnet_gw_bcast_force_on_returns_400(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
with self.network() as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.0.0/24',
webob.exc.HTTPClientError.code,
gateway_ip='10.0.0.255')
**** CubicPower OpenStack Study ****
def test_create_subnet_with_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_none_gateway(self):
cidr = '10.0.0.0/24'
self._test_create_subnet(gateway_ip=None,
cidr=cidr)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_none_gateway_fully_allocated(self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.254'}]
self._test_create_subnet(gateway_ip=None,
cidr=cidr,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_subnet_with_allocation_range(self):
with self.network() as network:
net_id = network['network']['id']
data = {'subnet': {'network_id': net_id,
'cidr': '10.0.0.0/24',
'ip_version': 4,
'gateway_ip': '10.0.0.1',
'tenant_id': network['network']['tenant_id'],
'allocation_pools': [{'start': '10.0.0.100',
'end': '10.0.0.120'}]}}
subnet_req = self.new_create_request('subnets', data)
subnet = self.deserialize(self.fmt,
subnet_req.get_response(self.api))
# Check fixed IP not in allocation range
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.10'}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
port = self.deserialize(self.fmt, res)
# delete the port
self._delete('ports', port['port']['id'])
# Check when fixed IP is gateway
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.1'}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
port = self.deserialize(self.fmt, res)
# delete the port
self._delete('ports', port['port']['id'])
**** CubicPower OpenStack Study ****
def test_create_subnet_with_none_gateway_allocation_pool(self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
self._test_create_subnet(gateway_ip=None,
cidr=cidr,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_large_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/8'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.1.0.0',
'end': '10.200.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_multiple_allocation_pools(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.0.0.110',
'end': '10.0.0.150'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_dhcp_disabled(self):
enable_dhcp = False
self._test_create_subnet(enable_dhcp=enable_dhcp)
**** CubicPower OpenStack Study ****
def test_create_subnet_default_gw_conflict_allocation_pool_returns_409(
self):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.5'}]
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
gateway_ip = '10.0.0.50'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.100'}]
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.150'},
{'start': '10.0.0.140',
'end': '10.0.0.180'}]
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPConflict.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_invalid_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.256'}]
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.1.6'}]
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_shared_returns_400(self):
cidr = '10.0.0.0/24'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
shared=True)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv6_cidrv4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 6,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv4_cidrv6(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 4,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv4_gatewayv6(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'gateway_ip': 'fe80::1',
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv6_gatewayv4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 6,
'gateway_ip': '192.168.0.1',
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv6_dns_v4(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'fe80::0/80',
'ip_version': 6,
'dns_nameservers': ['192.168.0.1'],
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv4_hostroute_dst_v6(self):
host_routes = [{'destination': 'fe80::0/48',
'nexthop': '10.0.2.20'}]
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'host_routes': host_routes,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_inconsistent_ipv4_hostroute_np_v6(self):
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': 'fe80::1'}]
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'host_routes': host_routes,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_create_subnet_ipv6_attributes(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
for mode in constants.IPV6_MODES:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode=mode,
ipv6_address_mode=mode)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_create_subnet_ipv6_attributes_no_dhcp_enabled(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
for mode in constants.IPV6_MODES:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
enable_dhcp=False,
ipv6_ra_mode=mode,
ipv6_address_mode=mode)
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_invalid_ipv6_ra_mode(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode='foo',
ipv6_address_mode='slaac')
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_invalid_ipv6_address_mode(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode='slaac',
ipv6_address_mode='baz')
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_invalid_ipv6_combination(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode='stateful',
ipv6_address_mode='stateless')
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_create_subnet_ipv6_single_attribute_set(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
for mode in constants.IPV6_MODES:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode=None,
ipv6_address_mode=mode)
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
ipv6_ra_mode=mode,
ipv6_address_mode=None)
**** CubicPower OpenStack Study ****
def test_update_subnet_no_gateway(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['gateway_ip'],
data['subnet']['gateway_ip'])
data = {'subnet': {'gateway_ip': None}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertIsNone(data['subnet']['gateway_ip'])
**** CubicPower OpenStack Study ****
def test_update_subnet(self):
with self.subnet() as subnet:
data = {'subnet': {'gateway_ip': '11.0.0.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['gateway_ip'],
data['subnet']['gateway_ip'])
**** CubicPower OpenStack Study ****
def test_update_subnet_adding_additional_host_routes_and_dns(self):
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': '10.0.2.2'}]
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'dns_nameservers': ['192.168.0.1'],
'host_routes': host_routes,
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = self.deserialize(self.fmt, subnet_req.get_response(self.api))
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': '10.0.2.2'},
{'destination': '192.168.0.0/24',
'nexthop': '10.0.2.3'}]
dns_nameservers = ['192.168.0.1', '192.168.0.2']
data = {'subnet': {'host_routes': host_routes,
'dns_nameservers': dns_nameservers}}
req = self.new_update_request('subnets', data,
res['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(sorted(res['subnet']['host_routes']),
sorted(host_routes))
self.assertEqual(sorted(res['subnet']['dns_nameservers']),
sorted(dns_nameservers))
**** CubicPower OpenStack Study ****
def test_update_subnet_shared_returns_400(self):
with self.network(shared=True) as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'shared': True}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_gw_outside_cidr_force_on_returns_400(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
with self.network() as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'gateway_ip': '100.0.0.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_gw_ip_in_use_returns_409(self):
with self.network() as network:
with self.subnet(
network=network,
allocation_pools=[{'start': '10.0.0.100',
'end': '10.0.0.253'}]) as subnet:
subnet_data = subnet['subnet']
with self.port(
subnet=subnet,
fixed_ips=[{'subnet_id': subnet_data['id'],
'ip_address': subnet_data['gateway_ip']}]):
data = {'subnet': {'gateway_ip': '10.0.0.99'}}
req = self.new_update_request('subnets', data,
subnet_data['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 409)
**** CubicPower OpenStack Study ****
def test_update_subnet_inconsistent_ipv4_gatewayv6(self):
with self.network() as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'gateway_ip': 'fe80::1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'gateway_ip': '10.1.1.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_inconsistent_ipv4_dns_v6(self):
dns_nameservers = ['fe80::1']
with self.network() as network:
with self.subnet(network=network) as subnet:
data = {'subnet': {'dns_nameservers': dns_nameservers}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
host_routes = [{'destination': 'fe80::0/48',
'nexthop': '10.0.2.20'}]
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'host_routes': host_routes}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
host_routes = [{'destination': '172.16.0.0/24',
'nexthop': 'fe80::1'}]
with self.network() as network:
with self.subnet(network=network,
ip_version=6, cidr='fe80::/48') as subnet:
data = {'subnet': {'host_routes': host_routes}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_gateway_in_allocation_pool_returns_409(self):
allocation_pools = [{'start': '10.0.0.2', 'end': '10.0.0.254'}]
with self.network() as network:
with self.subnet(network=network,
allocation_pools=allocation_pools,
cidr='10.0.0.0/24') as subnet:
data = {'subnet': {'gateway_ip': '10.0.0.50'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPConflict.code)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_update_subnet_ipv6_attributes(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
data = {'subnet': {'ipv6_ra_mode': constants.DHCPV6_STATEFUL,
'ipv6_address_mode': constants.DHCPV6_STATEFUL}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['ipv6_ra_mode'],
data['subnet']['ipv6_ra_mode'])
self.assertEqual(res['subnet']['ipv6_address_mode'],
data['subnet']['ipv6_address_mode'])
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_update_subnet_ipv6_inconsistent_ra_attribute(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
data = {'subnet': {'ipv6_ra_mode': constants.DHCPV6_STATEFUL}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_update_subnet_ipv6_inconsistent_address_attribute(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
data = {'subnet': {'ipv6_address_mode': constants.DHCPV6_STATEFUL}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
@testcase.skip("Skipped until bug 1304093 is fixed")
**** CubicPower OpenStack Study ****
def test_update_subnet_ipv6_inconsistent_enable_dhcp(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
data = {'subnet': {'enable_dhcp': False}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_show_subnet(self):
with self.network() as network:
with self.subnet(network=network) as subnet:
req = self.new_show_request('subnets',
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['id'],
subnet['subnet']['id'])
self.assertEqual(res['subnet']['network_id'],
network['network']['id'])
**** CubicPower OpenStack Study ****
def test_list_subnets(self):
with self.network() as network:
with contextlib.nested(self.subnet(network=network,
gateway_ip='10.0.0.1',
cidr='10.0.0.0/24'),
self.subnet(network=network,
gateway_ip='10.0.1.1',
cidr='10.0.1.0/24'),
self.subnet(network=network,
gateway_ip='10.0.2.1',
cidr='10.0.2.0/24')) as subnets:
self._test_list_resources('subnet', subnets)
**** CubicPower OpenStack Study ****
def test_list_subnets_shared(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
with self.subnet(cidr='10.0.1.0/24') as priv_subnet:
# normal user should see only 1 subnet
req = self.new_list_request('subnets')
req.environ['neutron.context'] = context.Context(
'', 'some_tenant')
res = self.deserialize(self.fmt,
req.get_response(self.api))
self.assertEqual(len(res['subnets']), 1)
self.assertEqual(res['subnets'][0]['cidr'],
subnet['subnet']['cidr'])
# admin will see both subnets
admin_req = self.new_list_request('subnets')
admin_res = self.deserialize(
self.fmt, admin_req.get_response(self.api))
self.assertEqual(len(admin_res['subnets']), 2)
cidrs = [sub['cidr'] for sub in admin_res['subnets']]
self.assertIn(subnet['subnet']['cidr'], cidrs)
self.assertIn(priv_subnet['subnet']['cidr'], cidrs)
**** CubicPower OpenStack Study ****
def test_list_subnets_with_parameter(self):
with self.network() as network:
with contextlib.nested(self.subnet(network=network,
gateway_ip='10.0.0.1',
cidr='10.0.0.0/24'),
self.subnet(network=network,
gateway_ip='10.0.1.1',
cidr='10.0.1.0/24')
) as subnets:
query_params = 'ip_version=4&ip_version=6'
self._test_list_resources('subnet', subnets,
query_params=query_params)
query_params = 'ip_version=6'
self._test_list_resources('subnet', [],
query_params=query_params)
**** CubicPower OpenStack Study ****
def test_list_subnets_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(enable_dhcp=True,
cidr='10.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='11.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_sort('subnet', (subnet3, subnet2, subnet1),
[('enable_dhcp', 'asc'),
('cidr', 'desc')])
**** CubicPower OpenStack Study ****
def test_list_subnets_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(enable_dhcp=True,
cidr='10.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='11.0.0.0/24'),
self.subnet(enable_dhcp=False,
cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_sort('subnet', (subnet3,
subnet2,
subnet1),
[('enable_dhcp', 'asc'),
('cidr', 'desc')])
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_subnets_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination('subnet',
(subnet1, subnet2, subnet3),
('cidr', 'asc'), 2, 2)
**** CubicPower OpenStack Study ****
def test_list_subnets_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination('subnet',
(subnet1, subnet2, subnet3),
('cidr', 'asc'), 2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_list_subnets_with_pagination_reverse_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination_reverse('subnet',
(subnet1, subnet2,
subnet3),
('cidr', 'asc'), 2, 2)
**** CubicPower OpenStack Study ****
def test_list_subnets_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
try:
with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
self.subnet(cidr='11.0.0.0/24'),
self.subnet(cidr='12.0.0.0/24')
) as (subnet1, subnet2, subnet3):
self._test_list_with_pagination_reverse('subnet',
(subnet1, subnet2,
subnet3),
('cidr', 'asc'), 2, 2)
finally:
helper_patcher.stop()
**** CubicPower OpenStack Study ****
def test_invalid_ip_version(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 7,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_invalid_subnet(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': 'invalid',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_invalid_ip_address(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': 'ipaddress'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_invalid_uuid(self):
with self.network() as network:
data = {'subnet': {'network_id': 'invalid-uuid',
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1'}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_one_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
dns_nameservers = ['1.2.3.4']
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
dns_nameservers=dns_nameservers)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_two_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
dns_nameservers = ['1.2.3.4', '4.3.2.1']
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
dns_nameservers=dns_nameservers)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_too_many_dns(self):
with self.network() as network:
dns_list = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1',
'dns_nameservers': dns_list}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_one_host_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
host_routes=host_routes)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_two_host_routes(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'},
{'destination': '12.0.0.0/8',
'nexthop': '4.3.2.1'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools,
host_routes=host_routes)
**** CubicPower OpenStack Study ****
def test_create_subnet_with_too_many_routes(self):
with self.network() as network:
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'},
{'destination': '12.0.0.0/8',
'nexthop': '4.3.2.1'},
{'destination': '141.212.0.0/16',
'nexthop': '2.2.2.2'}]
data = {'subnet': {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.0.1',
'host_routes': host_routes}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_dns(self):
with self.subnet() as subnet:
data = {'subnet': {'dns_nameservers': ['11.0.0.1']}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['dns_nameservers'],
data['subnet']['dns_nameservers'])
**** CubicPower OpenStack Study ****
def test_update_subnet_dns_to_None(self):
with self.subnet(dns_nameservers=['11.0.0.1']) as subnet:
data = {'subnet': {'dns_nameservers': None}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual([], res['subnet']['dns_nameservers'])
data = {'subnet': {'dns_nameservers': ['11.0.0.3']}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['subnet']['dns_nameservers'],
res['subnet']['dns_nameservers'])
**** CubicPower OpenStack Study ****
def test_update_subnet_dns_with_too_many_entries(self):
with self.subnet() as subnet:
dns_list = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
data = {'subnet': {'dns_nameservers': dns_list}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_update_subnet_route(self):
with self.subnet() as subnet:
data = {'subnet': {'host_routes':
[{'destination': '12.0.0.0/8', 'nexthop': '1.2.3.4'}]}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['subnet']['host_routes'],
data['subnet']['host_routes'])
**** CubicPower OpenStack Study ****
def test_update_subnet_route_to_None(self):
with self.subnet(host_routes=[{'destination': '12.0.0.0/8',
'nexthop': '1.2.3.4'}]) as subnet:
data = {'subnet': {'host_routes': None}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual([], res['subnet']['host_routes'])
data = {'subnet': {'host_routes': [{'destination': '12.0.0.0/8',
'nexthop': '1.2.3.4'}]}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['subnet']['host_routes'],
res['subnet']['host_routes'])
**** CubicPower OpenStack Study ****
def test_update_subnet_route_with_too_many_entries(self):
with self.subnet() as subnet:
data = {'subnet': {'host_routes': [
{'destination': '12.0.0.0/8', 'nexthop': '1.2.3.4'},
{'destination': '13.0.0.0/8', 'nexthop': '1.2.3.5'},
{'destination': '14.0.0.0/8', 'nexthop': '1.2.3.6'}]}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
**** CubicPower OpenStack Study ****
def test_delete_subnet_with_dns(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
dns_nameservers = ['1.2.3.4']
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4,
dns_nameservers=dns_nameservers)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_delete_subnet_with_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4,
host_routes=host_routes)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def test_delete_subnet_with_dns_and_route(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
dns_nameservers = ['1.2.3.4']
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]
# Create new network
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4,
dns_nameservers=dns_nameservers,
host_routes=host_routes)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
**** CubicPower OpenStack Study ****
def _helper_test_validate_subnet(self, option, exception):
cfg.CONF.set_override(option, 0)
with self.network() as network:
subnet = {'network_id': network['network']['id'],
'cidr': '10.0.2.0/24',
'ip_version': 4,
'tenant_id': network['network']['tenant_id'],
'gateway_ip': '10.0.2.1',
'dns_nameservers': ['8.8.8.8'],
'host_routes': [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}]}
plugin = NeutronManager.get_plugin()
e = self.assertRaises(exception,
plugin._validate_subnet,
context.get_admin_context(
load_admin_roles=False),
subnet)
self.assertThat(
str(e),
matchers.Not(matchers.Contains('built-in function id')))
**** CubicPower OpenStack Study ****
def test_validate_subnet_dns_nameservers_exhausted(self):
self._helper_test_validate_subnet(
'max_dns_nameservers',
n_exc.DNSNameServersExhausted)
**** CubicPower OpenStack Study ****
def test_validate_subnet_host_routes_exhausted(self):
self._helper_test_validate_subnet(
'max_subnet_host_routes',
n_exc.HostRoutesExhausted)
**** CubicPower OpenStack Study ****
class DbModelTestCase(base.BaseTestCase):
"""DB model tests."""
**** CubicPower OpenStack Study ****
def test_repr(self):
"""testing the string representation of 'model' classes."""
network = models_v2.Network(name="net_net", status="OK",
admin_state_up=True)
actual_repr_output = repr(network)
exp_start_with = " exp_middle = "[object at %x]" % id(network) exp_end_with = (" {tenant_id=None, id=None, "
"name='net_net', status='OK', "
"admin_state_up=True, shared=None}>")
final_exp = exp_start_with + exp_middle + exp_end_with
self.assertEqual(actual_repr_output, final_exp)
**** CubicPower OpenStack Study ****
class TestNeutronDbPluginV2(base.BaseTestCase):
"""Unit Tests for NeutronDbPluginV2 IPAM Logic."""
**** CubicPower OpenStack Study ****
def test_generate_ip(self):
with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'_try_generate_ip') as generate:
with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'_rebuild_availability_ranges') as rebuild:
db_base_plugin_v2.NeutronDbPluginV2._generate_ip('c', 's')
generate.assert_called_once_with('c', 's')
self.assertEqual(0, rebuild.call_count)
**** CubicPower OpenStack Study ****
def test_generate_ip_exhausted_pool(self):
with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'_try_generate_ip') as generate:
with mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2,
'_rebuild_availability_ranges') as rebuild:
exception = n_exc.IpAddressGenerationFailure(net_id='n')
generate.side_effect = exception
# I want the side_effect to throw an exception once but I
# didn't see a way to do this. So, let it throw twice and
# catch the second one. Check below to ensure that
# _try_generate_ip was called twice.
try:
db_base_plugin_v2.NeutronDbPluginV2._generate_ip('c', 's')
except n_exc.IpAddressGenerationFailure:
pass
self.assertEqual(2, generate.call_count)
rebuild.assert_called_once_with('c', 's')
**** CubicPower OpenStack Study ****
def test_rebuild_availability_ranges(self):
pools = [{'id': 'a',
'first_ip': '192.168.1.3',
'last_ip': '192.168.1.10'},
{'id': 'b',
'first_ip': '192.168.1.100',
'last_ip': '192.168.1.120'}]
allocations = [{'ip_address': '192.168.1.3'},
{'ip_address': '192.168.1.78'},
{'ip_address': '192.168.1.7'},
{'ip_address': '192.168.1.110'},
{'ip_address': '192.168.1.11'},
{'ip_address': '192.168.1.4'},
{'ip_address': '192.168.1.111'}]
ip_qry = mock.Mock()
ip_qry.with_lockmode.return_value = ip_qry
ip_qry.filter_by.return_value = allocations
pool_qry = mock.Mock()
pool_qry.options.return_value = pool_qry
pool_qry.with_lockmode.return_value = pool_qry
pool_qry.filter_by.return_value = pools
def return_queries_side_effect(*args, **kwargs):
if args[0] == models_v2.IPAllocation:
return ip_qry
if args[0] == models_v2.IPAllocationPool:
return pool_qry
context = mock.Mock()
context.session.query.side_effect = return_queries_side_effect
subnets = [mock.MagicMock()]
db_base_plugin_v2.NeutronDbPluginV2._rebuild_availability_ranges(
context, subnets)
actual = [[args[0].allocation_pool_id,
args[0].first_ip, args[0].last_ip]
for _name, args, _kwargs in context.session.add.mock_calls]
self.assertEqual([['a', '192.168.1.5', '192.168.1.6'],
['a', '192.168.1.8', '192.168.1.10'],
['b', '192.168.1.100', '192.168.1.109'],
['b', '192.168.1.112', '192.168.1.120']], actual)
**** CubicPower OpenStack Study ****
def return_queries_side_effect(*args, **kwargs):
if args[0] == models_v2.IPAllocation:
return ip_qry
if args[0] == models_v2.IPAllocationPool:
return pool_qry
context = mock.Mock()
context.session.query.side_effect = return_queries_side_effect
subnets = [mock.MagicMock()]
db_base_plugin_v2.NeutronDbPluginV2._rebuild_availability_ranges(
context, subnets)
actual = [[args[0].allocation_pool_id,
args[0].first_ip, args[0].last_ip]
for _name, args, _kwargs in context.session.add.mock_calls]
self.assertEqual([['a', '192.168.1.5', '192.168.1.6'],
['a', '192.168.1.8', '192.168.1.10'],
['b', '192.168.1.100', '192.168.1.109'],
['b', '192.168.1.112', '192.168.1.120']], actual)
**** CubicPower OpenStack Study ****
class NeutronDbPluginV2AsMixinTestCase(base.BaseTestCase):
"""Tests for NeutronDbPluginV2 as Mixin.
While NeutronDbPluginV2TestCase checks NeutronDbPlugin and all plugins as
a complete plugin, this test case verifies abilities of NeutronDbPlugin
which are provided to other plugins (e.g. DB operations). This test case
may include tests only for NeutronDbPlugin, so this should not be used in
unit tests for other plugins.
"""
**** CubicPower OpenStack Study ****
def setUp(self):
super(NeutronDbPluginV2AsMixinTestCase, self).setUp()
self.plugin = importutils.import_object(DB_PLUGIN_KLASS)
self.context = context.get_admin_context()
self.net_data = {'network': {'id': 'fake-id',
'name': 'net1',
'admin_state_up': True,
'tenant_id': 'test-tenant',
'shared': False}}
self.addCleanup(db.clear_db)
**** CubicPower OpenStack Study ****
def test_create_network_with_default_status(self):
net = self.plugin.create_network(self.context, self.net_data)
default_net_create_status = 'ACTIVE'
expected = [('id', 'fake-id'), ('name', 'net1'),
('admin_state_up', True), ('tenant_id', 'test-tenant'),
('shared', False), ('status', default_net_create_status)]
for k, v in expected:
self.assertEqual(net[k], v)
**** CubicPower OpenStack Study ****
def test_create_network_with_status_BUILD(self):
self.net_data['network']['status'] = 'BUILD'
net = self.plugin.create_network(self.context, self.net_data)
self.assertEqual(net['status'], 'BUILD')
**** CubicPower OpenStack Study ****
class TestBasicGetXML(TestBasicGet):
fmt = 'xml'
**** CubicPower OpenStack Study ****
class TestNetworksV2XML(TestNetworksV2):
fmt = 'xml'
**** CubicPower OpenStack Study ****
class TestPortsV2XML(TestPortsV2):
fmt = 'xml'
**** CubicPower OpenStack Study ****
class TestSubnetsV2XML(TestSubnetsV2):
fmt = 'xml'
**** CubicPower OpenStack Study ****
class TestV2HTTPResponseXML(TestV2HTTPResponse):
fmt = 'xml'