¡@

Home 

OpenStack Study: test_extension_security_group.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2012 OpenStack Foundation.

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or

# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import contextlib

import os

import mock

import webob.exc

from neutron.api.v2 import attributes as attr

from neutron.common import constants as const

from neutron import context

from neutron.db import db_base_plugin_v2

from neutron.db import securitygroups_db

from neutron.extensions import securitygroup as ext_sg

from neutron.tests.unit import test_db_plugin

DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.'

'SecurityGroupTestPlugin')

ROOTDIR = os.path.dirname(os.path.dirname(__file__))

ETCDIR = os.path.join(ROOTDIR, 'etc')

**** CubicPower OpenStack Study ****

def etcdir(*p):

    return os.path.join(ETCDIR, *p)

**** CubicPower OpenStack Study ****

class SecurityGroupTestExtensionManager(object):

**** CubicPower OpenStack Study ****

    def get_resources(self):

        # Add the resources to the global attribute map

        # This is done here as the setup process won't

        # initialize the main API router which extends

        # the global attribute map

        attr.RESOURCE_ATTRIBUTE_MAP.update(

            ext_sg.RESOURCE_ATTRIBUTE_MAP)

        return ext_sg.Securitygroup.get_resources()

**** CubicPower OpenStack Study ****

    def get_actions(self):

        return []

**** CubicPower OpenStack Study ****

    def get_request_extensions(self):

        return []

**** CubicPower OpenStack Study ****

class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):

**** CubicPower OpenStack Study ****

    def _create_security_group(self, fmt, name, description, **kwargs):

        data = {'security_group': {'name': name,

                                   'tenant_id': kwargs.get('tenant_id',

                                                           'test_tenant'),

                                   'description': description}}

        security_group_req = self.new_create_request('security-groups', data,

                                                     fmt)

        if (kwargs.get('set_context') and 'tenant_id' in kwargs):

            # create a specific auth context for this request

            security_group_req.environ['neutron.context'] = (

                context.Context('', kwargs['tenant_id']))

        return security_group_req.get_response(self.ext_api)

**** CubicPower OpenStack Study ****

    def _build_security_group_rule(self, security_group_id, direction, proto,

                                   port_range_min=None, port_range_max=None,

                                   remote_ip_prefix=None, remote_group_id=None,

                                   tenant_id='test_tenant',

                                   ethertype=const.IPv4):

        data = {'security_group_rule': {'security_group_id': security_group_id,

                                        'direction': direction,

                                        'protocol': proto,

                                        'ethertype': ethertype,

                                        'tenant_id': tenant_id,

                                        'ethertype': ethertype}}

        if port_range_min:

            data['security_group_rule']['port_range_min'] = port_range_min

        if port_range_max:

            data['security_group_rule']['port_range_max'] = port_range_max

        if remote_ip_prefix:

            data['security_group_rule']['remote_ip_prefix'] = remote_ip_prefix

        if remote_group_id:

            data['security_group_rule']['remote_group_id'] = remote_group_id

        return data

**** CubicPower OpenStack Study ****

    def _create_security_group_rule(self, fmt, rules, **kwargs):

        security_group_rule_req = self.new_create_request(

            'security-group-rules', rules, fmt)

        if (kwargs.get('set_context') and 'tenant_id' in kwargs):

            # create a specific auth context for this request

            security_group_rule_req.environ['neutron.context'] = (

                context.Context('', kwargs['tenant_id']))

        return security_group_rule_req.get_response(self.ext_api)

**** CubicPower OpenStack Study ****

    def _make_security_group(self, fmt, name, description, **kwargs):

        res = self._create_security_group(fmt, name, description, **kwargs)

        if res.status_int >= webob.exc.HTTPBadRequest.code:

            raise webob.exc.HTTPClientError(code=res.status_int)

        return self.deserialize(fmt, res)

**** CubicPower OpenStack Study ****

    def _make_security_group_rule(self, fmt, rules, **kwargs):

        res = self._create_security_group_rule(self.fmt, rules)

        if res.status_int >= webob.exc.HTTPBadRequest.code:

            raise webob.exc.HTTPClientError(code=res.status_int)

        return self.deserialize(fmt, res)

    @contextlib.contextmanager

**** CubicPower OpenStack Study ****

    def security_group(self, name='webservers', description='webservers',

                       fmt=None, no_delete=False):

        if not fmt:

            fmt = self.fmt

        security_group = self._make_security_group(fmt, name, description)

        yield security_group

        if not no_delete:

            self._delete('security-groups',

                         security_group['security_group']['id'])

    @contextlib.contextmanager

**** CubicPower OpenStack Study ****

    def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'

                                                    'd1db38eb087',

                            direction='ingress', protocol=const.PROTO_NAME_TCP,

                            port_range_min='22', port_range_max='22',

                            remote_ip_prefix=None, remote_group_id=None,

                            fmt=None, no_delete=False, ethertype=const.IPv4):

        if not fmt:

            fmt = self.fmt

        rule = self._build_security_group_rule(security_group_id,

                                               direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_ip_prefix,

                                               remote_group_id,

                                               ethertype=ethertype)

        security_group_rule = self._make_security_group_rule(self.fmt, rule)

        yield security_group_rule

        if not no_delete:

            self._delete('security-group-rules',

                         security_group_rule['security_group_rule']['id'])

**** CubicPower OpenStack Study ****

    def _delete_default_security_group_egress_rules(self, security_group_id):

        """Deletes default egress rules given a security group ID."""

        res = self._list(

            'security-group-rules',

            query_params='security_group_id=%s' % security_group_id)

        for r in res['security_group_rules']:

            if (r['direction'] == 'egress' and not r['port_range_max'] and

                    not r['port_range_min'] and not r['protocol']

                    and not r['remote_ip_prefix']):

                self._delete('security-group-rules', r['id'])

**** CubicPower OpenStack Study ****

    def _assert_sg_rule_has_kvs(self, security_group_rule, expected_kvs):

        """Asserts that the sg rule has expected key/value pairs passed

           in as expected_kvs dictionary

        """

        for k, v in expected_kvs.iteritems():

            self.assertEqual(security_group_rule[k], v)

**** CubicPower OpenStack Study ****

class SecurityGroupsTestCaseXML(SecurityGroupsTestCase):

fmt = 'xml'

**** CubicPower OpenStack Study ****

class SecurityGroupTestPlugin(db_base_plugin_v2.NeutronDbPluginV2, securitygroups_db.SecurityGroupDbMixin):

"""Test plugin that implements necessary calls on create/delete port for

associating ports with security groups.

"""

__native_pagination_support = True

__native_sorting_support = True

supported_extension_aliases = ["security-group"]

**** CubicPower OpenStack Study ****

    def create_port(self, context, port):

        tenant_id = self._get_tenant_id_for_create(context, port['port'])

        default_sg = self._ensure_default_security_group(context, tenant_id)

        if not attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):

            port['port'][ext_sg.SECURITYGROUPS] = [default_sg]

        session = context.session

        with session.begin(subtransactions=True):

            sgids = self._get_security_groups_on_port(context, port)

            port = super(SecurityGroupTestPlugin, self).create_port(context,

                                                                    port)

            self._process_port_create_security_group(context, port,

                                                     sgids)

        return port

**** CubicPower OpenStack Study ****

    def update_port(self, context, id, port):

        session = context.session

        with session.begin(subtransactions=True):

            if ext_sg.SECURITYGROUPS in port['port']:

                port['port'][ext_sg.SECURITYGROUPS] = (

                    self._get_security_groups_on_port(context, port))

                # delete the port binding and read it with the new rules

                self._delete_port_security_group_bindings(context, id)

                port['port']['id'] = id

                self._process_port_create_security_group(

                    context, port['port'],

                    port['port'].get(ext_sg.SECURITYGROUPS))

            port = super(SecurityGroupTestPlugin, self).update_port(

                context, id, port)

        return port

**** CubicPower OpenStack Study ****

    def create_network(self, context, network):

        tenant_id = self._get_tenant_id_for_create(context, network['network'])

        self._ensure_default_security_group(context, tenant_id)

        return super(SecurityGroupTestPlugin, self).create_network(context,

                                                                   network)

**** CubicPower OpenStack Study ****

    def get_ports(self, context, filters=None, fields=None,

                  sorts=[], limit=None, marker=None,

                  page_reverse=False):

        neutron_lports = super(SecurityGroupTestPlugin, self).get_ports(

            context, filters, sorts=sorts, limit=limit, marker=marker,

            page_reverse=page_reverse)

        return neutron_lports

**** CubicPower OpenStack Study ****

class SecurityGroupDBTestCase(SecurityGroupsTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self, plugin=None, ext_mgr=None):

        plugin = plugin or DB_PLUGIN_KLASS

        ext_mgr = ext_mgr or SecurityGroupTestExtensionManager()

        super(SecurityGroupDBTestCase,

              self).setUp(plugin=plugin, ext_mgr=ext_mgr)

**** CubicPower OpenStack Study ****

class TestSecurityGroups(SecurityGroupDBTestCase):

**** CubicPower OpenStack Study ****

    def test_create_security_group(self):

        name = 'webservers'

        description = 'my webservers'

        keys = [('name', name,), ('description', description)]

        with self.security_group(name, description) as security_group:

            for k, v, in keys:

                self.assertEqual(security_group['security_group'][k], v)

        # Verify that default egress rules have been created

        sg_rules = security_group['security_group']['security_group_rules']

        self.assertEqual(len(sg_rules), 2)

        v4_rules = [r for r in sg_rules if r['ethertype'] == const.IPv4]

        self.assertEqual(len(v4_rules), 1)

        v4_rule = v4_rules[0]

        expected = {'direction': 'egress',

                    'ethertype': const.IPv4,

                    'remote_group_id': None,

                    'remote_ip_prefix': None,

                    'protocol': None,

                    'port_range_max': None,

                    'port_range_min': None}

        self._assert_sg_rule_has_kvs(v4_rule, expected)

        v6_rules = [r for r in sg_rules if r['ethertype'] == const.IPv6]

        self.assertEqual(len(v6_rules), 1)

        v6_rule = v6_rules[0]

        expected = {'direction': 'egress',

                    'ethertype': const.IPv6,

                    'remote_group_id': None,

                    'remote_ip_prefix': None,

                    'protocol': None,

                    'port_range_max': None,

                    'port_range_min': None}

        self._assert_sg_rule_has_kvs(v6_rule, expected)

**** CubicPower OpenStack Study ****

    def test_update_security_group(self):

        with self.security_group() as sg:

            data = {'security_group': {'name': 'new_name',

                                       'description': 'new_desc'}}

            req = self.new_update_request('security-groups',

                                          data,

                                          sg['security_group']['id'])

            res = self.deserialize(self.fmt, req.get_response(self.ext_api))

            self.assertEqual(res['security_group']['name'],

                             data['security_group']['name'])

            self.assertEqual(res['security_group']['description'],

                             data['security_group']['description'])

**** CubicPower OpenStack Study ****

    def test_update_security_group_name_to_default_fail(self):

        with self.security_group() as sg:

            data = {'security_group': {'name': 'default',

                                       'description': 'new_desc'}}

            req = self.new_update_request('security-groups',

                                          data,

                                          sg['security_group']['id'])

            req.environ['neutron.context'] = context.Context('', 'somebody')

            res = req.get_response(self.ext_api)

            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_update_default_security_group_name_fail(self):

        with self.network():

            res = self.new_list_request('security-groups')

            sg = self.deserialize(self.fmt, res.get_response(self.ext_api))

            data = {'security_group': {'name': 'new_name',

                                       'description': 'new_desc'}}

            req = self.new_update_request('security-groups',

                                          data,

                                          sg['security_groups'][0]['id'])

            req.environ['neutron.context'] = context.Context('', 'somebody')

            res = req.get_response(self.ext_api)

            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_update_default_security_group_with_description(self):

        with self.network():

            res = self.new_list_request('security-groups')

            sg = self.deserialize(self.fmt, res.get_response(self.ext_api))

            data = {'security_group': {'description': 'new_desc'}}

            req = self.new_update_request('security-groups',

                                          data,

                                          sg['security_groups'][0]['id'])

            res = self.deserialize(self.fmt, req.get_response(self.ext_api))

            self.assertEqual(res['security_group']['description'],

                             data['security_group']['description'])

**** CubicPower OpenStack Study ****

    def test_default_security_group(self):

        with self.network():

            res = self.new_list_request('security-groups')

            groups = self.deserialize(self.fmt, res.get_response(self.ext_api))

            self.assertEqual(len(groups['security_groups']), 1)

**** CubicPower OpenStack Study ****

    def test_create_default_security_group_fail(self):

        name = 'default'

        description = 'my webservers'

        res = self._create_security_group(self.fmt, name, description)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_list_security_groups(self):

        with contextlib.nested(self.security_group(name='sg1',

                                                   description='sg'),

                               self.security_group(name='sg2',

                                                   description='sg'),

                               self.security_group(name='sg3',

                                                   description='sg')

                               ) as security_groups:

            self._test_list_resources('security-group',

                                      security_groups,

                                      query_params='description=sg')

**** CubicPower OpenStack Study ****

    def test_list_security_groups_with_sort(self):

        with contextlib.nested(self.security_group(name='sg1',

                                                   description='sg'),

                               self.security_group(name='sg2',

                                                   description='sg'),

                               self.security_group(name='sg3',

                                                   description='sg')

                               ) as (sg1, sg2, sg3):

            self._test_list_with_sort('security-group',

                                      (sg3, sg2, sg1),

                                      [('name', 'desc')],

                                      query_params='description=sg')

**** CubicPower OpenStack Study ****

    def test_list_security_groups_with_pagination(self):

        with contextlib.nested(self.security_group(name='sg1',

                                                   description='sg'),

                               self.security_group(name='sg2',

                                                   description='sg'),

                               self.security_group(name='sg3',

                                                   description='sg')

                               ) as (sg1, sg2, sg3):

            self._test_list_with_pagination('security-group',

                                            (sg1, sg2, sg3),

                                            ('name', 'asc'), 2, 2,

                                            query_params='description=sg')

**** CubicPower OpenStack Study ****

    def test_list_security_groups_with_pagination_reverse(self):

        with contextlib.nested(self.security_group(name='sg1',

                                                   description='sg'),

                               self.security_group(name='sg2',

                                                   description='sg'),

                               self.security_group(name='sg3',

                                                   description='sg')

                               ) as (sg1, sg2, sg3):

            self._test_list_with_pagination_reverse(

                'security-group', (sg1, sg2, sg3), ('name', 'asc'), 2, 2,

                query_params='description=sg')

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_ethertype_invalid_as_number(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            ethertype = 2

            rule = self._build_security_group_rule(

                security_group_id, 'ingress', const.PROTO_NAME_TCP, '22',

                '22', None, None, ethertype=ethertype)

            res = self._create_security_group_rule(self.fmt, rule)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_tcp_protocol_as_number(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            protocol = const.PROTO_NUM_TCP  # TCP

            rule = self._build_security_group_rule(

                security_group_id, 'ingress', protocol, '22', '22')

            res = self._create_security_group_rule(self.fmt, rule)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_protocol_as_number(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            protocol = 2

            rule = self._build_security_group_rule(

                security_group_id, 'ingress', protocol)

            res = self._create_security_group_rule(self.fmt, rule)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_case_insensitive(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            direction = "ingress"

            remote_ip_prefix = "10.0.0.0/24"

            protocol = 'TCP'

            port_range_min = 22

            port_range_max = 22

            ethertype = 'ipV4'

            with self.security_group_rule(security_group_id, direction,

                                          protocol, port_range_min,

                                          port_range_max,

                                          remote_ip_prefix,

                                          ethertype=ethertype) as rule:

                # the lower case value will be return

                self.assertEqual(rule['security_group_rule']['protocol'],

                                 protocol.lower())

                self.assertEqual(rule['security_group_rule']['ethertype'],

                                 const.IPv4)

**** CubicPower OpenStack Study ****

    def test_get_security_group(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            remote_group_id = sg['security_group']['id']

            res = self.new_show_request('security-groups', remote_group_id)

            security_group_id = sg['security_group']['id']

            direction = "ingress"

            remote_ip_prefix = "10.0.0.0/24"

            protocol = const.PROTO_NAME_TCP

            port_range_min = 22

            port_range_max = 22

            keys = [('remote_ip_prefix', remote_ip_prefix),

                    ('security_group_id', security_group_id),

                    ('direction', direction),

                    ('protocol', protocol),

                    ('port_range_min', port_range_min),

                    ('port_range_max', port_range_max)]

            with self.security_group_rule(security_group_id, direction,

                                          protocol, port_range_min,

                                          port_range_max,

                                          remote_ip_prefix):

                group = self.deserialize(

                    self.fmt, res.get_response(self.ext_api))

                sg_rule = group['security_group']['security_group_rules']

                self.assertEqual(group['security_group']['id'],

                                 remote_group_id)

                self.assertEqual(len(sg_rule), 3)

                sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']

                for k, v, in keys:

                    self.assertEqual(sg_rule[0][k], v)

**** CubicPower OpenStack Study ****

    def test_delete_security_group(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description, no_delete=True) as sg:

            remote_group_id = sg['security_group']['id']

            self._delete('security-groups', remote_group_id,

                         webob.exc.HTTPNoContent.code)

**** CubicPower OpenStack Study ****

    def test_delete_default_security_group_admin(self):

        with self.network():

            res = self.new_list_request('security-groups')

            sg = self.deserialize(self.fmt, res.get_response(self.ext_api))

            self._delete('security-groups', sg['security_groups'][0]['id'],

                         webob.exc.HTTPNoContent.code)

**** CubicPower OpenStack Study ****

    def test_delete_default_security_group_nonadmin(self):

        with self.network():

            res = self.new_list_request('security-groups')

            sg = self.deserialize(self.fmt, res.get_response(self.ext_api))

            neutron_context = context.Context('', 'test-tenant')

            self._delete('security-groups', sg['security_groups'][0]['id'],

                         webob.exc.HTTPConflict.code,

                         neutron_context=neutron_context)

**** CubicPower OpenStack Study ****

    def test_security_group_list_creates_default_security_group(self):

        neutron_context = context.Context('', 'test-tenant')

        sg = self._list('security-groups',

                        neutron_context=neutron_context).get('security_groups')

        self.assertEqual(len(sg), 1)

**** CubicPower OpenStack Study ****

    def test_default_security_group_rules(self):

        with self.network():

            res = self.new_list_request('security-groups')

            groups = self.deserialize(self.fmt, res.get_response(self.ext_api))

            self.assertEqual(len(groups['security_groups']), 1)

            security_group_id = groups['security_groups'][0]['id']

            res = self.new_list_request('security-group-rules')

            rules = self.deserialize(self.fmt, res.get_response(self.ext_api))

            self.assertEqual(len(rules['security_group_rules']), 4)

            # Verify default rule for v4 egress

            sg_rules = rules['security_group_rules']

            rules = [

                r for r in sg_rules

                if r['direction'] == 'egress' and r['ethertype'] == const.IPv4

            ]

            self.assertEqual(len(rules), 1)

            v4_egress = rules[0]

            expected = {'direction': 'egress',

                        'ethertype': const.IPv4,

                        'remote_group_id': None,

                        'remote_ip_prefix': None,

                        'protocol': None,

                        'port_range_max': None,

                        'port_range_min': None}

            self._assert_sg_rule_has_kvs(v4_egress, expected)

            # Verify default rule for v6 egress

            rules = [

                r for r in sg_rules

                if r['direction'] == 'egress' and r['ethertype'] == const.IPv6

            ]

            self.assertEqual(len(rules), 1)

            v6_egress = rules[0]

            expected = {'direction': 'egress',

                        'ethertype': const.IPv6,

                        'remote_group_id': None,

                        'remote_ip_prefix': None,

                        'protocol': None,

                        'port_range_max': None,

                        'port_range_min': None}

            self._assert_sg_rule_has_kvs(v6_egress, expected)

            # Verify default rule for v4 ingress

            rules = [

                r for r in sg_rules

                if r['direction'] == 'ingress' and r['ethertype'] == const.IPv4

            ]

            self.assertEqual(len(rules), 1)

            v4_ingress = rules[0]

            expected = {'direction': 'ingress',

                        'ethertype': const.IPv4,

                        'remote_group_id': security_group_id,

                        'remote_ip_prefix': None,

                        'protocol': None,

                        'port_range_max': None,

                        'port_range_min': None}

            self._assert_sg_rule_has_kvs(v4_ingress, expected)

            # Verify default rule for v6 ingress

            rules = [

                r for r in sg_rules

                if r['direction'] == 'ingress' and r['ethertype'] == const.IPv6

            ]

            self.assertEqual(len(rules), 1)

            v6_ingress = rules[0]

            expected = {'direction': 'ingress',

                        'ethertype': const.IPv6,

                        'remote_group_id': security_group_id,

                        'remote_ip_prefix': None,

                        'protocol': None,

                        'port_range_max': None,

                        'port_range_min': None}

            self._assert_sg_rule_has_kvs(v6_ingress, expected)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_remote_ip_prefix(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            direction = "ingress"

            remote_ip_prefix = "10.0.0.0/24"

            protocol = const.PROTO_NAME_TCP

            port_range_min = 22

            port_range_max = 22

            keys = [('remote_ip_prefix', remote_ip_prefix),

                    ('security_group_id', security_group_id),

                    ('direction', direction),

                    ('protocol', protocol),

                    ('port_range_min', port_range_min),

                    ('port_range_max', port_range_max)]

            with self.security_group_rule(security_group_id, direction,

                                          protocol, port_range_min,

                                          port_range_max,

                                          remote_ip_prefix) as rule:

                for k, v, in keys:

                    self.assertEqual(rule['security_group_rule'][k], v)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_group_id(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            with self.security_group(name, description) as sg2:

                security_group_id = sg['security_group']['id']

                direction = "ingress"

                remote_group_id = sg2['security_group']['id']

                protocol = const.PROTO_NAME_TCP

                port_range_min = 22

                port_range_max = 22

                keys = [('remote_group_id', remote_group_id),

                        ('security_group_id', security_group_id),

                        ('direction', direction),

                        ('protocol', protocol),

                        ('port_range_min', port_range_min),

                        ('port_range_max', port_range_max)]

                with self.security_group_rule(security_group_id, direction,

                                              protocol, port_range_min,

                                              port_range_max,

                                              remote_group_id=remote_group_id

                                              ) as rule:

                    for k, v, in keys:

                        self.assertEqual(rule['security_group_rule'][k], v)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_icmp_with_type_and_code(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            direction = "ingress"

            remote_ip_prefix = "10.0.0.0/24"

            protocol = const.PROTO_NAME_ICMP

            # port_range_min (ICMP type) is greater than port_range_max

            # (ICMP code) in order to confirm min <= max port check is

            # not called for ICMP.

            port_range_min = 8

            port_range_max = 5

            keys = [('remote_ip_prefix', remote_ip_prefix),

                    ('security_group_id', security_group_id),

                    ('direction', direction),

                    ('protocol', protocol),

                    ('port_range_min', port_range_min),

                    ('port_range_max', port_range_max)]

            with self.security_group_rule(security_group_id, direction,

                                          protocol, port_range_min,

                                          port_range_max,

                                          remote_ip_prefix) as rule:

                for k, v, in keys:

                    self.assertEqual(rule['security_group_rule'][k], v)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_icmp_with_type_only(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            direction = "ingress"

            remote_ip_prefix = "10.0.0.0/24"

            protocol = const.PROTO_NAME_ICMP

            # ICMP type

            port_range_min = 8

            # ICMP code

            port_range_max = None

            keys = [('remote_ip_prefix', remote_ip_prefix),

                    ('security_group_id', security_group_id),

                    ('direction', direction),

                    ('protocol', protocol),

                    ('port_range_min', port_range_min),

                    ('port_range_max', port_range_max)]

            with self.security_group_rule(security_group_id, direction,

                                          protocol, port_range_min,

                                          port_range_max,

                                          remote_ip_prefix) as rule:

                for k, v, in keys:

                    self.assertEqual(rule['security_group_rule'][k], v)

**** CubicPower OpenStack Study ****

    def test_create_security_group_source_group_ip_and_ip_prefix(self):

        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"

        direction = "ingress"

        remote_ip_prefix = "10.0.0.0/24"

        protocol = const.PROTO_NAME_TCP

        port_range_min = 22

        port_range_max = 22

        remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"

        rule = self._build_security_group_rule(security_group_id, direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_ip_prefix,

                                               remote_group_id)

        res = self._create_security_group_rule(self.fmt, rule)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bad_security_group_id(self):

        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"

        direction = "ingress"

        remote_ip_prefix = "10.0.0.0/24"

        protocol = const.PROTO_NAME_TCP

        port_range_min = 22

        port_range_max = 22

        rule = self._build_security_group_rule(security_group_id, direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_ip_prefix)

        res = self._create_security_group_rule(self.fmt, rule)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bad_tenant(self):

        with self.security_group() as sg:

            rule = {'security_group_rule':

                    {'security_group_id': sg['security_group']['id'],

                     'direction': 'ingress',

                     'protocol': const.PROTO_NAME_TCP,

                     'port_range_min': '22',

                     'port_range_max': '22',

                     'tenant_id': "bad_tenant"}}

            res = self._create_security_group_rule(self.fmt, rule,

                                                   tenant_id='bad_tenant',

                                                   set_context=True)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bad_tenant_remote_group_id(self):

        with self.security_group() as sg:

            res = self._create_security_group(self.fmt, 'webservers',

                                              'webservers',

                                              tenant_id='bad_tenant')

            sg2 = self.deserialize(self.fmt, res)

            rule = {'security_group_rule':

                    {'security_group_id': sg2['security_group']['id'],

                     'direction': 'ingress',

                     'protocol': const.PROTO_NAME_TCP,

                     'port_range_min': '22',

                     'port_range_max': '22',

                     'tenant_id': 'bad_tenant',

                     'remote_group_id': sg['security_group']['id']}}

            res = self._create_security_group_rule(self.fmt, rule,

                                                   tenant_id='bad_tenant',

                                                   set_context=True)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bad_tenant_security_group_rule(self):

        with self.security_group() as sg:

            res = self._create_security_group(self.fmt, 'webservers',

                                              'webservers',

                                              tenant_id='bad_tenant')

            self.deserialize(self.fmt, res)

            rule = {'security_group_rule':

                    {'security_group_id': sg['security_group']['id'],

                     'direction': 'ingress',

                     'protocol': const.PROTO_NAME_TCP,

                     'port_range_min': '22',

                     'port_range_max': '22',

                     'tenant_id': 'bad_tenant'}}

            res = self._create_security_group_rule(self.fmt, rule,

                                                   tenant_id='bad_tenant',

                                                   set_context=True)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bad_remote_group_id(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            remote_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"

            direction = "ingress"

            protocol = const.PROTO_NAME_TCP

            port_range_min = 22

            port_range_max = 22

        rule = self._build_security_group_rule(security_group_id, direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_group_id=remote_group_id)

        res = self._create_security_group_rule(self.fmt, rule)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_duplicate_rules(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22')

                self._create_security_group_rule(self.fmt, rule)

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_min_port_greater_max(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,

                                 const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:

                    rule = self._build_security_group_rule(

                        sg['security_group']['id'],

                        'ingress', protocol, '50', '22')

                    res = self._create_security_group_rule(self.fmt, rule)

                    self.deserialize(self.fmt, res)

                    self.assertEqual(res.status_int,

                                     webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_ports_but_no_protocol(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress', None, '22', '22')

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_port_range_min_only(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', None)

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_port_range_max_only(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, None, '22')

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_icmp_type_too_big(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_ICMP, '256', None)

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_icmp_code_too_big(self):

        name = 'webservers'

        description = 'my webservers'

        with self.security_group(name, description) as sg:

            security_group_id = sg['security_group']['id']

            with self.security_group_rule(security_group_id):

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_ICMP, '8', '256')

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_list_ports_security_group(self):

        with self.network() as n:

            with self.subnet(n):

                self._create_port(self.fmt, n['network']['id'])

                req = self.new_list_request('ports')

                res = req.get_response(self.api)

                ports = self.deserialize(self.fmt, res)

                port = ports['ports'][0]

                self.assertEqual(len(port[ext_sg.SECURITYGROUPS]), 1)

                self._delete('ports', port['id'])

**** CubicPower OpenStack Study ****

    def test_list_security_group_rules(self):

        with self.security_group(name='sg') as sg:

            security_group_id = sg['security_group']['id']

            with contextlib.nested(self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=22,

                                                            port_range_max=22),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=23,

                                                            port_range_max=23),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=24,

                                                            port_range_max=24)

                                   ) as (sgr1, sgr2, sgr3):

                # Delete default rules as they would fail the following

                # assertion at the end.

                self._delete_default_security_group_egress_rules(

                    security_group_id)

                q = 'direction=egress&security_group_id=' + security_group_id

                self._test_list_resources('security-group-rule',

                                          [sgr1, sgr2, sgr3],

                                          query_params=q)

**** CubicPower OpenStack Study ****

    def test_list_security_group_rules_with_sort(self):

        with self.security_group(name='sg') as sg:

            security_group_id = sg['security_group']['id']

            with contextlib.nested(self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=22,

                                                            port_range_max=22),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=23,

                                                            port_range_max=23),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=24,

                                                            port_range_max=24)

                                   ) as (sgr1, sgr2, sgr3):

                # Delete default rules as they would fail the following

                # assertion at the end.

                self._delete_default_security_group_egress_rules(

                    security_group_id)

                q = 'direction=egress&security_group_id=' + security_group_id

                self._test_list_with_sort('security-group-rule',

                                          (sgr3, sgr2, sgr1),

                                          [('port_range_max', 'desc')],

                                          query_params=q)

**** CubicPower OpenStack Study ****

    def test_list_security_group_rules_with_pagination(self):

        with self.security_group(name='sg') as sg:

            security_group_id = sg['security_group']['id']

            with contextlib.nested(self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=22,

                                                            port_range_max=22),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=23,

                                                            port_range_max=23),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=24,

                                                            port_range_max=24)

                                   ) as (sgr1, sgr2, sgr3):

                # Delete default rules as they would fail the following

                # assertion at the end.

                self._delete_default_security_group_egress_rules(

                    security_group_id)

                q = 'direction=egress&security_group_id=' + security_group_id

                self._test_list_with_pagination(

                    'security-group-rule', (sgr3, sgr2, sgr1),

                    ('port_range_max', 'desc'), 2, 2,

                    query_params=q)

**** CubicPower OpenStack Study ****

    def test_list_security_group_rules_with_pagination_reverse(self):

        with self.security_group(name='sg') as sg:

            security_group_id = sg['security_group']['id']

            with contextlib.nested(self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=22,

                                                            port_range_max=22),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=23,

                                                            port_range_max=23),

                                   self.security_group_rule(security_group_id,

                                                            direction='egress',

                                                            port_range_min=24,

                                                            port_range_max=24)

                                   ) as (sgr1, sgr2, sgr3):

                self._test_list_with_pagination_reverse(

                    'security-group-rule', (sgr3, sgr2, sgr1),

                    ('port_range_max', 'desc'), 2, 2,

                    query_params='direction=egress')

**** CubicPower OpenStack Study ****

    def test_update_port_with_security_group(self):

        with self.network() as n:

            with self.subnet(n):

                with self.security_group() as sg:

                    res = self._create_port(self.fmt, n['network']['id'])

                    port = self.deserialize(self.fmt, res)

                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],

                                     'name': port['port']['name'],

                                     ext_sg.SECURITYGROUPS:

                                     [sg['security_group']['id']]}}

                    req = self.new_update_request('ports', data,

                                                  port['port']['id'])

                    res = self.deserialize(self.fmt,

                                           req.get_response(self.api))

                    self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],

                                     sg['security_group']['id'])

                    # Test update port without security group

                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],

                                     'name': port['port']['name']}}

                    req = self.new_update_request('ports', data,

                                                  port['port']['id'])

                    res = self.deserialize(self.fmt,

                                           req.get_response(self.api))

                    self.assertEqual(res['port'][ext_sg.SECURITYGROUPS][0],

                                     sg['security_group']['id'])

                    self._delete('ports', port['port']['id'])

**** CubicPower OpenStack Study ****

    def test_update_port_with_multiple_security_groups(self):

        with self.network() as n:

            with self.subnet(n):

                with self.security_group() as sg1:

                    with self.security_group() as sg2:

                        res = self._create_port(

                            self.fmt, n['network']['id'],

                            security_groups=[sg1['security_group']['id'],

                                             sg2['security_group']['id']])

                        port = self.deserialize(self.fmt, res)

                        self.assertEqual(len(

                            port['port'][ext_sg.SECURITYGROUPS]), 2)

                        self._delete('ports', port['port']['id'])

**** CubicPower OpenStack Study ****

    def test_update_port_remove_security_group_empty_list(self):

        with self.network() as n:

            with self.subnet(n):

                with self.security_group() as sg:

                    res = self._create_port(self.fmt, n['network']['id'],

                                            security_groups=(

                                                [sg['security_group']['id']]))

                    port = self.deserialize(self.fmt, res)

                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],

                                     'name': port['port']['name'],

                                     'security_groups': []}}

                    req = self.new_update_request('ports', data,

                                                  port['port']['id'])

                    res = self.deserialize(self.fmt,

                                           req.get_response(self.api))

                    self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),

                                     [])

                    self._delete('ports', port['port']['id'])

**** CubicPower OpenStack Study ****

    def test_update_port_remove_security_group_none(self):

        with self.network() as n:

            with self.subnet(n):

                with self.security_group() as sg:

                    res = self._create_port(self.fmt, n['network']['id'],

                                            security_groups=(

                                                [sg['security_group']['id']]))

                    port = self.deserialize(self.fmt, res)

                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],

                                     'name': port['port']['name'],

                                     'security_groups': None}}

                    req = self.new_update_request('ports', data,

                                                  port['port']['id'])

                    res = self.deserialize(self.fmt,

                                           req.get_response(self.api))

                    self.assertEqual(res['port'].get(ext_sg.SECURITYGROUPS),

                                     [])

                    self._delete('ports', port['port']['id'])

**** CubicPower OpenStack Study ****

    def test_create_port_with_bad_security_group(self):

        with self.network() as n:

            with self.subnet(n):

                res = self._create_port(self.fmt, n['network']['id'],

                                        security_groups=['bad_id'])

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_delete_security_group_port_in_use(self):

        with self.network() as n:

            with self.subnet(n):

                with self.security_group() as sg:

                    res = self._create_port(self.fmt, n['network']['id'],

                                            security_groups=(

                                                [sg['security_group']['id']]))

                    port = self.deserialize(self.fmt, res)

                    self.assertEqual(port['port'][ext_sg.SECURITYGROUPS][0],

                                     sg['security_group']['id'])

                    # try to delete security group that's in use

                    res = self._delete('security-groups',

                                       sg['security_group']['id'],

                                       webob.exc.HTTPConflict.code)

                    # delete the blocking port

                    self._delete('ports', port['port']['id'])

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bulk_native(self):

        if self._skip_native_bulk:

            self.skipTest("Plugin does not support native bulk "

                          "security_group_rule create")

        with self.security_group() as sg:

            rule1 = self._build_security_group_rule(sg['security_group']['id'],

                                                    'ingress',

                                                    const.PROTO_NAME_TCP, '22',

                                                    '22', '10.0.0.1/24')

            rule2 = self._build_security_group_rule(sg['security_group']['id'],

                                                    'ingress',

                                                    const.PROTO_NAME_TCP, '23',

                                                    '23', '10.0.0.1/24')

            rules = {'security_group_rules': [rule1['security_group_rule'],

                                              rule2['security_group_rule']]}

            res = self._create_security_group_rule(self.fmt, rules)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_bulk_emulated(self):

        real_has_attr = hasattr

        #ensures the API choose the emulation code path

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule1 = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rule2 = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')

                rules = {'security_group_rules': [rule1['security_group_rule'],

                                                  rule2['security_group_rule']]

                         }

                res = self._create_security_group_rule(self.fmt, rules)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule1 = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rule2 = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')

                rules = {'security_group_rules': [rule1['security_group_rule'],

                                                  rule2['security_group_rule']]

                         }

                res = self._create_security_group_rule(self.fmt, rules)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_allow_all_ipv4(self):

        with self.security_group() as sg:

            rule = {'security_group_id': sg['security_group']['id'],

                    'direction': 'ingress',

                    'ethertype': 'IPv4',

                    'tenant_id': 'test_tenant'}

            res = self._create_security_group_rule(

                self.fmt, {'security_group_rule': rule})

            rule = self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self):

        if self._skip_native_bulk:

            self.skipTest("Plugin does not support native bulk "

                          "security_group_rule create")

        with self.security_group() as sg:

            rule_v4 = {'security_group_id': sg['security_group']['id'],

                       'direction': 'ingress',

                       'ethertype': 'IPv4',

                       'tenant_id': 'test_tenant'}

            rule_v6 = {'security_group_id': sg['security_group']['id'],

                       'direction': 'ingress',

                       'ethertype': 'IPv6',

                       'tenant_id': 'test_tenant'}

            rules = {'security_group_rules': [rule_v4, rule_v6]}

            res = self._create_security_group_rule(self.fmt, rules)

            self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_duplicate_rule_in_post(self):

        if self._skip_native_bulk:

            self.skipTest("Plugin does not support native bulk "

                          "security_group_rule create")

        with self.security_group() as sg:

            rule = self._build_security_group_rule(sg['security_group']['id'],

                                                   'ingress',

                                                   const.PROTO_NAME_TCP, '22',

                                                   '22', '10.0.0.1/24')

            rules = {'security_group_rules': [rule['security_group_rule'],

                                              rule['security_group_rule']]}

            res = self._create_security_group_rule(self.fmt, rules)

            rule = self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):

        real_has_attr = hasattr

        #ensures the API choose the emulation code path

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rules = {'security_group_rules': [rule['security_group_rule'],

                                                  rule['security_group_rule']]}

                res = self._create_security_group_rule(self.fmt, rules)

                rule = self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rules = {'security_group_rules': [rule['security_group_rule'],

                                                  rule['security_group_rule']]}

                res = self._create_security_group_rule(self.fmt, rules)

                rule = self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_duplicate_rule_db(self):

        if self._skip_native_bulk:

            self.skipTest("Plugin does not support native bulk "

                          "security_group_rule create")

        with self.security_group() as sg:

            rule = self._build_security_group_rule(sg['security_group']['id'],

                                                   'ingress',

                                                   const.PROTO_NAME_TCP, '22',

                                                   '22', '10.0.0.1/24')

            rules = {'security_group_rules': [rule]}

            self._create_security_group_rule(self.fmt, rules)

            res = self._create_security_group_rule(self.fmt, rules)

            rule = self.deserialize(self.fmt, res)

            self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_duplicate_rule_db_emulated(self):

        real_has_attr = hasattr

        #ensures the API choose the emulation code path

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rules = {'security_group_rules': [rule]}

                self._create_security_group_rule(self.fmt, rules)

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

        def fakehasattr(item, attr):

            if attr.endswith('__native_bulk_support'):

                return False

            return real_has_attr(item, attr)

        with mock.patch('__builtin__.hasattr',

                        new=fakehasattr):

            with self.security_group() as sg:

                rule = self._build_security_group_rule(

                    sg['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rules = {'security_group_rules': [rule]}

                self._create_security_group_rule(self.fmt, rules)

                res = self._create_security_group_rule(self.fmt, rule)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_different_security_group_ids(self):

        if self._skip_native_bulk:

            self.skipTest("Plugin does not support native bulk "

                          "security_group_rule create")

        with self.security_group() as sg1:

            with self.security_group() as sg2:

                rule1 = self._build_security_group_rule(

                    sg1['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '22', '22', '10.0.0.1/24')

                rule2 = self._build_security_group_rule(

                    sg2['security_group']['id'], 'ingress',

                    const.PROTO_NAME_TCP, '23', '23', '10.0.0.1/24')

                rules = {'security_group_rules': [rule1['security_group_rule'],

                                                  rule2['security_group_rule']]

                         }

                res = self._create_security_group_rule(self.fmt, rules)

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_with_invalid_ethertype(self):

        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"

        direction = "ingress"

        remote_ip_prefix = "10.0.0.0/24"

        protocol = const.PROTO_NAME_TCP

        port_range_min = 22

        port_range_max = 22

        remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"

        rule = self._build_security_group_rule(security_group_id, direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_ip_prefix,

                                               remote_group_id,

                                               ethertype='IPv5')

        res = self._create_security_group_rule(self.fmt, rule)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_security_group_rule_with_invalid_protocol(self):

        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"

        direction = "ingress"

        remote_ip_prefix = "10.0.0.0/24"

        protocol = 'tcp/ip'

        port_range_min = 22

        port_range_max = 22

        remote_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"

        rule = self._build_security_group_rule(security_group_id, direction,

                                               protocol, port_range_min,

                                               port_range_max,

                                               remote_ip_prefix,

                                               remote_group_id)

        res = self._create_security_group_rule(self.fmt, rule)

        self.deserialize(self.fmt, res)

        self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

    def test_create_port_with_non_uuid(self):

        with self.network() as n:

            with self.subnet(n):

                res = self._create_port(self.fmt, n['network']['id'],

                                        security_groups=['not_valid'])

                self.deserialize(self.fmt, res)

                self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)

**** CubicPower OpenStack Study ****

class TestSecurityGroupsXML(TestSecurityGroups):

fmt = 'xml'