**** CubicPower OpenStack Study ****
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013, Nachi Ueno, NTT I3, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.services.vpn.device_drivers import ipsec as ipsec_driver
from neutron.tests import base
_uuid = uuidutils.generate_uuid
FAKE_HOST = 'fake_host'
FAKE_ROUTER_ID = _uuid()
FAKE_VPN_SERVICE = {
'id': _uuid(),
'router_id': FAKE_ROUTER_ID,
'admin_state_up': True,
'status': constants.PENDING_CREATE,
'subnet': {'cidr': '10.0.0.0/24'},
'ipsec_site_connections': [
{'peer_cidrs': ['20.0.0.0/24',
'30.0.0.0/24']},
{'peer_cidrs': ['40.0.0.0/24',
'50.0.0.0/24']}]
}
**** CubicPower OpenStack Study ****
class TestIPsecDeviceDriver(base.BaseTestCase):
**** CubicPower OpenStack Study ****
def setUp(self, driver=ipsec_driver.OpenSwanDriver):
super(TestIPsecDeviceDriver, self).setUp()
for klass in [
'os.makedirs',
'os.path.isdir',
'neutron.agent.linux.utils.replace_file',
'neutron.openstack.common.rpc.create_connection',
'neutron.services.vpn.device_drivers.ipsec.'
'OpenSwanProcess._gen_config_content',
'shutil.rmtree',
]:
mock.patch(klass).start()
self.execute = mock.patch(
'neutron.agent.linux.utils.execute').start()
self.agent = mock.Mock()
self.driver = driver(
self.agent,
FAKE_HOST)
self.driver.agent_rpc = mock.Mock()
**** CubicPower OpenStack Study ****
def test_vpnservice_updated(self):
with mock.patch.object(self.driver, 'sync') as sync:
context = mock.Mock()
self.driver.vpnservice_updated(context)
sync.assert_called_once_with(context, [])
**** CubicPower OpenStack Study ****
def test_create_router(self):
process_id = _uuid()
process = mock.Mock()
process.vpnservice = FAKE_VPN_SERVICE
self.driver.processes = {
process_id: process}
self.driver.create_router(process_id)
process.enable.assert_called_once_with()
**** CubicPower OpenStack Study ****
def test_destroy_router(self):
process_id = _uuid()
process = mock.Mock()
process.vpnservice = FAKE_VPN_SERVICE
self.driver.processes = {
process_id: process}
self.driver.destroy_router(process_id)
process.disable.assert_called_once_with()
self.assertNotIn(process_id, self.driver.processes)
**** CubicPower OpenStack Study ****
def test_sync_added(self):
self.driver.agent_rpc.get_vpn_services_on_host.return_value = [
FAKE_VPN_SERVICE]
context = mock.Mock()
process = mock.Mock()
process.vpnservice = FAKE_VPN_SERVICE
process.connection_status = {}
process.status = constants.ACTIVE
process.updated_pending_status = True
self.driver.process_status_cache = {}
self.driver.processes = {
FAKE_ROUTER_ID: process}
self.driver.sync(context, [])
self.agent.assert_has_calls([
mock.call.add_nat_rule(
FAKE_ROUTER_ID,
'POSTROUTING',
'-s 10.0.0.0/24 -d 20.0.0.0/24 -m policy '
'--dir out --pol ipsec -j ACCEPT ',
top=True),
mock.call.add_nat_rule(
FAKE_ROUTER_ID,
'POSTROUTING',
'-s 10.0.0.0/24 -d 30.0.0.0/24 -m policy '
'--dir out --pol ipsec -j ACCEPT ',
top=True),
mock.call.add_nat_rule(
FAKE_ROUTER_ID,
'POSTROUTING',
'-s 10.0.0.0/24 -d 40.0.0.0/24 -m policy '
'--dir out --pol ipsec -j ACCEPT ',
top=True),
mock.call.add_nat_rule(
FAKE_ROUTER_ID,
'POSTROUTING',
'-s 10.0.0.0/24 -d 50.0.0.0/24 -m policy '
'--dir out --pol ipsec -j ACCEPT ',
top=True),
mock.call.iptables_apply(FAKE_ROUTER_ID)
])
process.update.assert_called_once_with()
self.driver.agent_rpc.update_status.assert_called_once_with(
context,
[{'status': 'ACTIVE',
'ipsec_site_connections': {},
'updated_pending_status': True,
'id': FAKE_VPN_SERVICE['id']}])
**** CubicPower OpenStack Study ****
def fake_ensure_process(self, process_id, vpnservice=None):
process = self.driver.processes.get(process_id)
if not process:
process = mock.Mock()
process.vpnservice = FAKE_VPN_SERVICE
process.connection_status = {}
process.status = constants.ACTIVE
process.updated_pending_status = True
self.driver.processes[process_id] = process
elif vpnservice:
process.vpnservice = vpnservice
process.update_vpnservice(vpnservice)
return process
**** CubicPower OpenStack Study ****
def test_sync_update_vpnservice(self):
with mock.patch.object(self.driver,
'ensure_process') as ensure_process:
ensure_process.side_effect = self.fake_ensure_process
new_vpn_service = FAKE_VPN_SERVICE
updated_vpn_service = copy.deepcopy(new_vpn_service)
updated_vpn_service['ipsec_site_connections'].append(
{'peer_cidrs': ['60.0.0.0/24',
'70.0.0.0/24']})
context = mock.Mock()
self.driver.process_status_cache = {}
self.driver.agent_rpc.get_vpn_services_on_host.return_value = [
new_vpn_service]
self.driver.sync(context, [])
process = self.driver.processes[FAKE_ROUTER_ID]
self.assertEqual(process.vpnservice, new_vpn_service)
self.driver.agent_rpc.get_vpn_services_on_host.return_value = [
updated_vpn_service]
self.driver.sync(context, [])
process = self.driver.processes[FAKE_ROUTER_ID]
process.update_vpnservice.assert_called_once_with(
updated_vpn_service)
self.assertEqual(process.vpnservice, updated_vpn_service)
**** CubicPower OpenStack Study ****
def test_sync_removed(self):
self.driver.agent_rpc.get_vpn_services_on_host.return_value = []
context = mock.Mock()
process_id = _uuid()
process = mock.Mock()
process.vpnservice = FAKE_VPN_SERVICE
self.driver.processes = {
process_id: process}
self.driver.sync(context, [])
process.disable.assert_called_once_with()
self.assertNotIn(process_id, self.driver.processes)
**** CubicPower OpenStack Study ****
def test_sync_removed_router(self):
self.driver.agent_rpc.get_vpn_services_on_host.return_value = []
context = mock.Mock()
process_id = _uuid()
self.driver.sync(context, [{'id': process_id}])
self.assertNotIn(process_id, self.driver.processes)
**** CubicPower OpenStack Study ****
def test_status_updated_on_connection_admin_down(self):
self.driver.process_status_cache = {
'1': {
'status': constants.ACTIVE,
'id': 123,
'updated_pending_status': False,
'ipsec_site_connections': {
'10': {
'status': constants.ACTIVE,
'updated_pending_status': False,
},
'20': {
'status': constants.ACTIVE,
'updated_pending_status': False,
}
}
}
}
# Simulate that there is no longer status for connection '20'
# e.g. connection admin down
new_status = {
'ipsec_site_connections': {
'10': {
'status': constants.ACTIVE,
'updated_pending_status': False
}
}
}
self.driver.update_downed_connections('1', new_status)
existing_conn = new_status['ipsec_site_connections'].get('10')
self.assertIsNotNone(existing_conn)
self.assertEqual(constants.ACTIVE, existing_conn['status'])
missing_conn = new_status['ipsec_site_connections'].get('20')
self.assertIsNotNone(missing_conn)
self.assertEqual(constants.DOWN, missing_conn['status'])
**** CubicPower OpenStack Study ****
def test_status_updated_on_service_admin_down(self):
self.driver.process_status_cache = {
'1': {
'status': constants.ACTIVE,
'id': 123,
'updated_pending_status': False,
'ipsec_site_connections': {
'10': {
'status': constants.ACTIVE,
'updated_pending_status': False,
},
'20': {
'status': constants.ACTIVE,
'updated_pending_status': False,
}
}
}
}
# Simulate that there are no connections now
new_status = {
'ipsec_site_connections': {}
}
self.driver.update_downed_connections('1', new_status)
missing_conn = new_status['ipsec_site_connections'].get('10')
self.assertIsNotNone(missing_conn)
self.assertEqual(constants.DOWN, missing_conn['status'])
missing_conn = new_status['ipsec_site_connections'].get('20')
self.assertIsNotNone(missing_conn)
self.assertEqual(constants.DOWN, missing_conn['status'])