**** CubicPower OpenStack Study ****
# Copyright 2013 VMware, Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import contextlib
import copy
import webob.exc
from neutron.api.v2 import attributes
from neutron import context
from neutron.extensions import firewall
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as const
from neutron.tests.unit.db.firewall import test_db_firewall
from neutron.tests.unit.vmware.vshield import test_edge_router
_uuid = uuidutils.generate_uuid
FW_PLUGIN_CLASS = "neutron.plugins.vmware.plugin.NsxServicePlugin"
**** CubicPower OpenStack Study ****
class FirewallTestExtensionManager(
test_edge_router.ServiceRouterTestExtensionManager):
**** CubicPower OpenStack Study ****
def get_resources(self):
# If l3 resources have been loaded and updated by main API
# router, update the map in the l3 extension so it will load
# the same attributes as the API router
resources = super(FirewallTestExtensionManager, self).get_resources()
firewall_attr_map = copy.deepcopy(firewall.RESOURCE_ATTRIBUTE_MAP)
for res in firewall.RESOURCE_ATTRIBUTE_MAP.keys():
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP.get(res)
if attr_info:
firewall.RESOURCE_ATTRIBUTE_MAP[res] = attr_info
fw_resources = firewall.Firewall.get_resources()
# restore the original resources once the controllers are created
firewall.RESOURCE_ATTRIBUTE_MAP = firewall_attr_map
resources.extend(fw_resources)
return resources
**** CubicPower OpenStack Study ****
def get_actions(self):
return []
**** CubicPower OpenStack Study ****
def get_request_extensions(self):
return []
**** CubicPower OpenStack Study ****
class FirewallPluginTestCase(test_db_firewall.FirewallPluginDbTestCase,
test_edge_router.ServiceRouterTest):
**** CubicPower OpenStack Study ****
def vcns_firewall_patch(self):
self.vcns_instance.return_value.update_firewall.side_effect = (
self.fc2.update_firewall)
self.vcns_instance.return_value.delete_firewall.side_effect = (
self.fc2.delete_firewall)
self.vcns_instance.return_value.update_firewall_rule.side_effect = (
self.fc2.update_firewall_rule)
self.vcns_instance.return_value.delete_firewall_rule.side_effect = (
self.fc2.delete_firewall_rule)
self.vcns_instance.return_value.add_firewall_rule_above.side_effect = (
self.fc2.add_firewall_rule_above)
self.vcns_instance.return_value.add_firewall_rule.side_effect = (
self.fc2.add_firewall_rule)
self.vcns_instance.return_value.get_firewall.side_effect = (
self.fc2.get_firewall)
self.vcns_instance.return_value.get_firewall_rule.side_effect = (
self.fc2.get_firewall_rule)
**** CubicPower OpenStack Study ****
def setUp(self):
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
super(FirewallPluginTestCase, self).setUp(
ext_mgr=FirewallTestExtensionManager(),
fw_plugin=FW_PLUGIN_CLASS)
self.vcns_firewall_patch()
self.plugin = manager.NeutronManager.get_plugin()
**** CubicPower OpenStack Study ****
def tearDown(self):
super(FirewallPluginTestCase, self).tearDown()
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
self.ext_api = None
self.plugin = None
**** CubicPower OpenStack Study ****
def _create_and_get_router(self):
req = self._create_router(self.fmt, self._tenant_id)
res = self.deserialize(self.fmt, req)
return res['router']['id']
**** CubicPower OpenStack Study ****
def _create_firewall(self, fmt, name, description, firewall_policy_id,
admin_state_up=True, expected_res_status=None,
**kwargs):
data = {'firewall': {'name': name,
'description': description,
'firewall_policy_id': firewall_policy_id,
'router_id': kwargs.get('router_id'),
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
firewall_req = self.new_create_request('firewalls', data, fmt)
firewall_res = firewall_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(firewall_res.status_int, expected_res_status)
return firewall_res
**** CubicPower OpenStack Study ****
def test_create_firewall(self):
name = "new_fw"
attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
attrs['router_id'] = self._create_and_get_router()
with self.firewall(name=name,
firewall_policy_id=fwp_id,
router_id=attrs['router_id'],
admin_state_up=
test_db_firewall.ADMIN_STATE_UP,
expected_res_status=201) as fw:
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
self.assertEqual(fw['firewall'][k], v)
**** CubicPower OpenStack Study ****
def test_create_firewall_without_policy(self):
name = "new_fw"
attrs = self._get_test_firewall_attrs(name)
attrs['router_id'] = self._create_and_get_router()
with self.firewall(name=name,
router_id=attrs['router_id'],
admin_state_up=
test_db_firewall.ADMIN_STATE_UP,
expected_res_status=201) as fw:
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
self.assertEqual(fw['firewall'][k], v)
**** CubicPower OpenStack Study ****
def test_update_firewall(self):
name = "new_fw"
attrs = self._get_test_firewall_attrs(name)
attrs['router_id'] = self._create_and_get_router()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(
firewall_policy_id=fwp_id, router_id=attrs['router_id'],
admin_state_up=test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
new_data = {'firewall': {'name': name}}
req = self.new_update_request('firewalls', new_data, fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 200)
res_json = self.deserialize(
self.fmt, res)
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
self.assertEqual(res_json['firewall'][k], v)
**** CubicPower OpenStack Study ****
def test_delete_firewall(self):
ctx = context.get_admin_context()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(
firewall_policy_id=fwp_id,
router_id=self._create_and_get_router(),
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
fw_id = fw['firewall']['id']
with ctx.session.begin(subtransactions=True):
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.assertRaises(
firewall.FirewallNotFound,
self.plugin.get_firewall, ctx, fw_id)
**** CubicPower OpenStack Study ****
def test_show_firewall(self):
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
attrs['router_id'] = self._create_and_get_router()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(
name=name,
firewall_policy_id=fwp_id, router_id=attrs['router_id'],
admin_state_up=test_db_firewall.ADMIN_STATE_UP) as firewall:
req = self.new_show_request('firewalls',
firewall['firewall']['id'],
fmt=self.fmt)
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
attrs = self._replace_firewall_status(
attrs, const.PENDING_CREATE, const.ACTIVE)
for k, v in attrs.iteritems():
self.assertEqual(res['firewall'][k], v)
**** CubicPower OpenStack Study ****
def test_list_firewalls(self):
keys_list = []
for i in range(3):
keys_list.append({'name': "fw" + str(i),
'router_id': self._create_and_get_router(),
'admin_state_up': True,
'status': "ACTIVE"})
with contextlib.nested(
self.firewall(
name='fw0', router_id=keys_list[0]['router_id'],
admin_state_up=True, description='fw'),
self.firewall(
name='fw1', router_id=keys_list[1]['router_id'],
admin_state_up=True, description='fw'),
self.firewall(
name='fw2', router_id=keys_list[2]['router_id'],
admin_state_up=True, description='fw'),
) as (fw1, fw2, fw3):
self._test_list_resources(
'firewall', (fw1, fw2, fw3),
query_params='description=fw')
req = self.new_list_request('firewalls')
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res['firewalls']), 3)
for index in range(len(res['firewalls'])):
for k, v in keys_list[index].items():
self.assertEqual(res['firewalls'][index][k], v)
**** CubicPower OpenStack Study ****
def test_create_firewall_with_rules(self):
ctx = context.get_admin_context()
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request(
'firewall_policies', data, fwp_id)
req.get_response(self.ext_api)
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
with self.firewall(
firewall_policy_id=fwp_id,
router_id=self._create_and_get_router(),
admin_state_up=test_db_firewall.ADMIN_STATE_UP) as fw:
rule_list = (
self.plugin._make_firewall_rule_list_by_policy_id(
ctx, fw['firewall']['firewall_policy_id']))
self._compare_firewall_rule_lists(
fwp_id, fr, rule_list)
**** CubicPower OpenStack Study ****
def test_update_firewall_policy_with_no_firewall(self):
name = "new_firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
with self.firewall_policy(shared=test_db_firewall.SHARED,
firewall_rules=None,
audited=test_db_firewall.AUDITED) as fwp:
data = {'firewall_policy': {'name': name}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_policy'][k], v)
**** CubicPower OpenStack Study ****
def test_update_firewall_policy_with_firewall(self):
name = "new_firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
with self.firewall_policy(shared=test_db_firewall.SHARED,
firewall_rules=None,
audited=test_db_firewall.AUDITED) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
router_id=self._create_and_get_router(),
admin_state_up=
test_db_firewall.ADMIN_STATE_UP):
data = {'firewall_policy': {'name': name}}
req = self.new_update_request(
'firewall_policies', data, fwp['firewall_policy']['id'])
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_policy'][k], v)
**** CubicPower OpenStack Study ****
def test_update_firewall_rule_with_no_firewall(self):
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)
attrs['source_port'] = '10:20'
attrs['destination_port'] = '30:40'
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': '10:20',
'destination_port': '30:40'}}
req = self.new_update_request(
'firewall_rules', data, fwr['firewall_rule']['id'])
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_rule'][k], v)
attrs['source_port'] = '10000'
attrs['destination_port'] = '80'
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': 10000,
'destination_port': 80}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_rule'][k], v)
attrs['source_port'] = None
attrs['destination_port'] = None
with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': None,
'destination_port': None}}
req = self.new_update_request(
'firewall_rules', data, fwr['firewall_rule']['id'])
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_rule'][k], v)
**** CubicPower OpenStack Study ****
def test_update_firewall_rule_with_firewall(self):
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)
with self.firewall_rule() as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
router_id=self._create_and_get_router(),
admin_state_up=
test_db_firewall.ADMIN_STATE_UP):
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request(
'firewall_policies', data,
fwp['firewall_policy']['id'])
req.get_response(self.ext_api)
data = {'firewall_rule': {'name': name}}
req = self.new_update_request(
'firewall_rules', data,
fwr['firewall_rule']['id'])
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
attrs['firewall_policy_id'] = fwp_id
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_rule'][k], v)
**** CubicPower OpenStack Study ****
def test_insert_rule_with_no_firewall(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with contextlib.nested(self.firewall_rule(name='fwr0'),
self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3'),
self.firewall_rule(name='fwr4'),
self.firewall_rule(name='fwr5'),
self.firewall_rule(name='fwr6')) as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
# test insert when rule list is empty
fwr0_id = fwr[0]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr0_id)
self._rule_action('insert', fwp_id, fwr0_id,
insert_before=None,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert at top of rule list, insert_before and
# insert_after not provided
fwr1_id = fwr[1]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr1_id)
insert_data = {'firewall_rule_id': fwr1_id}
self._rule_action('insert', fwp_id, fwr0_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs, body_data=insert_data)
# test insert at top of list above existing rule
fwr2_id = fwr[2]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr2_id)
self._rule_action('insert', fwp_id, fwr2_id,
insert_before=fwr1_id,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert at bottom of list
fwr3_id = fwr[3]['firewall_rule']['id']
attrs['firewall_rules'].append(fwr3_id)
self._rule_action('insert', fwp_id, fwr3_id,
insert_before=None,
insert_after=fwr0_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert in the middle of the list using
# insert_before
fwr4_id = fwr[4]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr4_id)
self._rule_action('insert', fwp_id, fwr4_id,
insert_before=fwr1_id,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert in the middle of the list using
# insert_after
fwr5_id = fwr[5]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr5_id)
self._rule_action('insert', fwp_id, fwr5_id,
insert_before=None,
insert_after=fwr2_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert when both insert_before and
# insert_after are set
fwr6_id = fwr[6]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr6_id)
self._rule_action('insert', fwp_id, fwr6_id,
insert_before=fwr5_id,
insert_after=fwr5_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
**** CubicPower OpenStack Study ****
def test_insert_rule_with_firewall(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with contextlib.nested(self.firewall_rule(name='fwr0'),
self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3'),
self.firewall_rule(name='fwr4'),
self.firewall_rule(name='fwr5'),
self.firewall_rule(name='fwr6')) as fwr:
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with self.firewall(router_id=self._create_and_get_router(),
firewall_policy_id=fwp_id) as fw:
# test insert when rule list is empty
fwr0_id = fwr[0]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr0_id)
attrs['firewall_list'].insert(0, fw['firewall']['id'])
self._rule_action('insert', fwp_id, fwr0_id,
insert_before=None,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert at top of rule list, insert_before and
# insert_after not provided
fwr1_id = fwr[1]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr1_id)
insert_data = {'firewall_rule_id': fwr1_id}
self._rule_action(
'insert', fwp_id, fwr0_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs, body_data=insert_data)
# test insert at top of list above existing rule
fwr2_id = fwr[2]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr2_id)
self._rule_action('insert', fwp_id, fwr2_id,
insert_before=fwr1_id,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert at bottom of list
fwr3_id = fwr[3]['firewall_rule']['id']
attrs['firewall_rules'].append(fwr3_id)
self._rule_action('insert', fwp_id, fwr3_id,
insert_before=None,
insert_after=fwr0_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert in the middle of the list using
# insert_before
fwr4_id = fwr[4]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr4_id)
self._rule_action('insert', fwp_id, fwr4_id,
insert_before=fwr1_id,
insert_after=None,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert in the middle of the list using
# insert_after
fwr5_id = fwr[5]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr5_id)
self._rule_action('insert', fwp_id, fwr5_id,
insert_before=None,
insert_after=fwr2_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
# test insert when both insert_before and
# insert_after are set
fwr6_id = fwr[6]['firewall_rule']['id']
attrs['firewall_rules'].insert(1, fwr6_id)
self._rule_action('insert', fwp_id, fwr6_id,
insert_before=fwr5_id,
insert_after=fwr5_id,
expected_code=webob.exc.HTTPOk.code,
expected_body=attrs)
**** CubicPower OpenStack Study ****
def test_remove_rule_with_no_firewall(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr1:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp_id)
req.get_response(self.ext_api)
# test removing a rule from a policy that does not exist
self._rule_action('remove', '123', fw_rule_ids[1],
expected_code=webob.exc.HTTPNotFound.code,
expected_body=None)
# test removing a rule in the middle of the list
attrs['firewall_rules'].remove(fw_rule_ids[1])
self._rule_action('remove', fwp_id, fw_rule_ids[1],
expected_body=attrs)
# test removing a rule at the top of the list
attrs['firewall_rules'].remove(fw_rule_ids[0])
self._rule_action('remove', fwp_id, fw_rule_ids[0],
expected_body=attrs)
# test removing remaining rule in the list
attrs['firewall_rules'].remove(fw_rule_ids[2])
self._rule_action('remove', fwp_id, fw_rule_ids[2],
expected_body=attrs)
# test removing rule that is not associated with the policy
self._rule_action('remove', fwp_id, fw_rule_ids[2],
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)
**** CubicPower OpenStack Study ****
def test_remove_rule_with_firewall(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with self.firewall(router_id=self._create_and_get_router(),
firewall_policy_id=fwp_id) as fw:
attrs['firewall_list'].insert(0, fw['firewall']['id'])
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr1:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request(
'firewall_policies', data, fwp_id)
req.get_response(self.ext_api)
# test removing a rule from a policy that does not exist
self._rule_action(
'remove', '123',
fw_rule_ids[1],
expected_code=webob.exc.HTTPNotFound.code,
expected_body=None)
# test removing a rule in the middle of the list
attrs['firewall_rules'].remove(fw_rule_ids[1])
self._rule_action('remove', fwp_id, fw_rule_ids[1],
expected_body=attrs)
# test removing a rule at the top of the list
attrs['firewall_rules'].remove(fw_rule_ids[0])
self._rule_action('remove', fwp_id, fw_rule_ids[0],
expected_body=attrs)
# test removing remaining rule in the list
attrs['firewall_rules'].remove(fw_rule_ids[2])
self._rule_action('remove', fwp_id, fw_rule_ids[2],
expected_body=attrs)
# test removing rule that is not
#associated with the policy
self._rule_action(
'remove', fwp_id, fw_rule_ids[2],
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)