¡@

Home 

OpenStack Study: test_ipsec.py

OpenStack Index

**** CubicPower OpenStack Study ****

# vim: tabstop=4 shiftwidth=4 softtabstop=4

#

# Copyright 2013, Nachi Ueno, NTT I3, Inc.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import copy

import mock

from neutron.openstack.common import uuidutils

from neutron.plugins.common import constants

from neutron.services.vpn.device_drivers import ipsec as ipsec_driver

from neutron.tests import base

_uuid = uuidutils.generate_uuid

FAKE_HOST = 'fake_host'

FAKE_ROUTER_ID = _uuid()

FAKE_VPN_SERVICE = {

'id': _uuid(),

'router_id': FAKE_ROUTER_ID,

'admin_state_up': True,

'status': constants.PENDING_CREATE,

'subnet': {'cidr': '10.0.0.0/24'},

'ipsec_site_connections': [

{'peer_cidrs': ['20.0.0.0/24',

'30.0.0.0/24']},

{'peer_cidrs': ['40.0.0.0/24',

'50.0.0.0/24']}]

}

**** CubicPower OpenStack Study ****

class TestIPsecDeviceDriver(base.BaseTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self, driver=ipsec_driver.OpenSwanDriver):

        super(TestIPsecDeviceDriver, self).setUp()

        for klass in [

            'os.makedirs',

            'os.path.isdir',

            'neutron.agent.linux.utils.replace_file',

            'neutron.openstack.common.rpc.create_connection',

            'neutron.services.vpn.device_drivers.ipsec.'

                'OpenSwanProcess._gen_config_content',

            'shutil.rmtree',

        ]:

            mock.patch(klass).start()

        self.execute = mock.patch(

            'neutron.agent.linux.utils.execute').start()

        self.agent = mock.Mock()

        self.driver = driver(

            self.agent,

            FAKE_HOST)

        self.driver.agent_rpc = mock.Mock()

**** CubicPower OpenStack Study ****

    def test_vpnservice_updated(self):

        with mock.patch.object(self.driver, 'sync') as sync:

            context = mock.Mock()

            self.driver.vpnservice_updated(context)

            sync.assert_called_once_with(context, [])

**** CubicPower OpenStack Study ****

    def test_create_router(self):

        process_id = _uuid()

        process = mock.Mock()

        process.vpnservice = FAKE_VPN_SERVICE

        self.driver.processes = {

            process_id: process}

        self.driver.create_router(process_id)

        process.enable.assert_called_once_with()

**** CubicPower OpenStack Study ****

    def test_destroy_router(self):

        process_id = _uuid()

        process = mock.Mock()

        process.vpnservice = FAKE_VPN_SERVICE

        self.driver.processes = {

            process_id: process}

        self.driver.destroy_router(process_id)

        process.disable.assert_called_once_with()

        self.assertNotIn(process_id, self.driver.processes)

**** CubicPower OpenStack Study ****

    def test_sync_added(self):

        self.driver.agent_rpc.get_vpn_services_on_host.return_value = [

            FAKE_VPN_SERVICE]

        context = mock.Mock()

        process = mock.Mock()

        process.vpnservice = FAKE_VPN_SERVICE

        process.connection_status = {}

        process.status = constants.ACTIVE

        process.updated_pending_status = True

        self.driver.process_status_cache = {}

        self.driver.processes = {

            FAKE_ROUTER_ID: process}

        self.driver.sync(context, [])

        self.agent.assert_has_calls([

            mock.call.add_nat_rule(

                FAKE_ROUTER_ID,

                'POSTROUTING',

                '-s 10.0.0.0/24 -d 20.0.0.0/24 -m policy '

                '--dir out --pol ipsec -j ACCEPT ',

                top=True),

            mock.call.add_nat_rule(

                FAKE_ROUTER_ID,

                'POSTROUTING',

                '-s 10.0.0.0/24 -d 30.0.0.0/24 -m policy '

                '--dir out --pol ipsec -j ACCEPT ',

                top=True),

            mock.call.add_nat_rule(

                FAKE_ROUTER_ID,

                'POSTROUTING',

                '-s 10.0.0.0/24 -d 40.0.0.0/24 -m policy '

                '--dir out --pol ipsec -j ACCEPT ',

                top=True),

            mock.call.add_nat_rule(

                FAKE_ROUTER_ID,

                'POSTROUTING',

                '-s 10.0.0.0/24 -d 50.0.0.0/24 -m policy '

                '--dir out --pol ipsec -j ACCEPT ',

                top=True),

            mock.call.iptables_apply(FAKE_ROUTER_ID)

        ])

        process.update.assert_called_once_with()

        self.driver.agent_rpc.update_status.assert_called_once_with(

            context,

            [{'status': 'ACTIVE',

             'ipsec_site_connections': {},

             'updated_pending_status': True,

             'id': FAKE_VPN_SERVICE['id']}])

**** CubicPower OpenStack Study ****

    def fake_ensure_process(self, process_id, vpnservice=None):

        process = self.driver.processes.get(process_id)

        if not process:

            process = mock.Mock()

            process.vpnservice = FAKE_VPN_SERVICE

            process.connection_status = {}

            process.status = constants.ACTIVE

            process.updated_pending_status = True

            self.driver.processes[process_id] = process

        elif vpnservice:

            process.vpnservice = vpnservice

            process.update_vpnservice(vpnservice)

        return process

**** CubicPower OpenStack Study ****

    def test_sync_update_vpnservice(self):

        with mock.patch.object(self.driver,

                               'ensure_process') as ensure_process:

            ensure_process.side_effect = self.fake_ensure_process

            new_vpn_service = FAKE_VPN_SERVICE

            updated_vpn_service = copy.deepcopy(new_vpn_service)

            updated_vpn_service['ipsec_site_connections'].append(

                {'peer_cidrs': ['60.0.0.0/24',

                                '70.0.0.0/24']})

            context = mock.Mock()

            self.driver.process_status_cache = {}

            self.driver.agent_rpc.get_vpn_services_on_host.return_value = [

                new_vpn_service]

            self.driver.sync(context, [])

            process = self.driver.processes[FAKE_ROUTER_ID]

            self.assertEqual(process.vpnservice, new_vpn_service)

            self.driver.agent_rpc.get_vpn_services_on_host.return_value = [

                updated_vpn_service]

            self.driver.sync(context, [])

            process = self.driver.processes[FAKE_ROUTER_ID]

            process.update_vpnservice.assert_called_once_with(

                updated_vpn_service)

            self.assertEqual(process.vpnservice, updated_vpn_service)

**** CubicPower OpenStack Study ****

    def test_sync_removed(self):

        self.driver.agent_rpc.get_vpn_services_on_host.return_value = []

        context = mock.Mock()

        process_id = _uuid()

        process = mock.Mock()

        process.vpnservice = FAKE_VPN_SERVICE

        self.driver.processes = {

            process_id: process}

        self.driver.sync(context, [])

        process.disable.assert_called_once_with()

        self.assertNotIn(process_id, self.driver.processes)

**** CubicPower OpenStack Study ****

    def test_sync_removed_router(self):

        self.driver.agent_rpc.get_vpn_services_on_host.return_value = []

        context = mock.Mock()

        process_id = _uuid()

        self.driver.sync(context, [{'id': process_id}])

        self.assertNotIn(process_id, self.driver.processes)

**** CubicPower OpenStack Study ****

    def test_status_updated_on_connection_admin_down(self):

        self.driver.process_status_cache = {

            '1': {

                'status': constants.ACTIVE,

                'id': 123,

                'updated_pending_status': False,

                'ipsec_site_connections': {

                    '10': {

                        'status': constants.ACTIVE,

                        'updated_pending_status': False,

                    },

                    '20': {

                        'status': constants.ACTIVE,

                        'updated_pending_status': False,

                    }

                }

            }

        }

        # Simulate that there is no longer status for connection '20'

        # e.g. connection admin down

        new_status = {

            'ipsec_site_connections': {

                '10': {

                    'status': constants.ACTIVE,

                    'updated_pending_status': False

                }

            }

        }

        self.driver.update_downed_connections('1', new_status)

        existing_conn = new_status['ipsec_site_connections'].get('10')

        self.assertIsNotNone(existing_conn)

        self.assertEqual(constants.ACTIVE, existing_conn['status'])

        missing_conn = new_status['ipsec_site_connections'].get('20')

        self.assertIsNotNone(missing_conn)

        self.assertEqual(constants.DOWN, missing_conn['status'])

**** CubicPower OpenStack Study ****

    def test_status_updated_on_service_admin_down(self):

        self.driver.process_status_cache = {

            '1': {

                'status': constants.ACTIVE,

                'id': 123,

                'updated_pending_status': False,

                'ipsec_site_connections': {

                    '10': {

                        'status': constants.ACTIVE,

                        'updated_pending_status': False,

                    },

                    '20': {

                        'status': constants.ACTIVE,

                        'updated_pending_status': False,

                    }

                }

            }

        }

        # Simulate that there are no connections now

        new_status = {

            'ipsec_site_connections': {}

        }

        self.driver.update_downed_connections('1', new_status)

        missing_conn = new_status['ipsec_site_connections'].get('10')

        self.assertIsNotNone(missing_conn)

        self.assertEqual(constants.DOWN, missing_conn['status'])

        missing_conn = new_status['ipsec_site_connections'].get('20')

        self.assertIsNotNone(missing_conn)

        self.assertEqual(constants.DOWN, missing_conn['status'])