¡@

Home 

OpenStack Study: test_volume_manager_fc.py

OpenStack Index

**** CubicPower OpenStack Study ****

# (c) Copyright 2014 Brocade Communications Systems Inc.

# All Rights Reserved.

#

# Copyright 2014 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

#

"""Unit tests for Volume Manager."""

import mock

from cinder import exception

from cinder import test

from cinder import utils

from cinder.volume import configuration as conf

from cinder.volume import driver

from cinder.volume import manager

from cinder.zonemanager import fc_zone_manager

init_target_map = {'10008c7cff523b01': ['20240002ac000a50']}

conn_info = {

'driver_volume_type': 'fibre_channel',

'data': {

'target_discovered': True,

'target_lun': 1,

'target_wwn': '20240002ac000a50',

'initiator_target_map': {

'10008c7cff523b01': ['20240002ac000a50']

}

}

}

conn_info_no_init_target_map = {

'driver_volume_type': 'fibre_channel',

'data': {

'target_discovered': True,

'target_lun': 1,

'target_wwn': '20240002ac000a50',

}

}

**** CubicPower OpenStack Study ****

class TestVolumeManager(manager.VolumeManager, test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(TestVolumeManager, self).setUp()

        self.configuration = conf.Configuration(None)

        self.configuration.set_default('fc_fabric_names', 'BRCD_FAB_4',

                                       'fc-zone-manager')

        self.configuration.zoning_mode = 'fabric'

        self.driver = mock.Mock(driver.VolumeDriver)

        self.driver.initialize_connection.return_value = conn_info

        self.driver.terminate_connection.return_value = conn_info

        self.driver.create_export.return_value = None

        self.db = mock.Mock()

        self.db.volume_get.return_value = {'volume_type_id': None}

        self.db.volume_admin_metadata_get.return_value = {}

        self.context_mock = mock.Mock()

        self.context_mock.elevated.return_value = None

        self.zonemanager = fc_zone_manager.ZoneManager(

            configuration=self.configuration)

**** CubicPower OpenStack Study ****

    def tearDown(self):

        super(TestVolumeManager, self).tearDown()

        self.configuration = None

        self.db = None

        self.driver = None

        self.zonemanager = None

**** CubicPower OpenStack Study ****

    def __init__(self, *args, **kwargs):

        test.TestCase.__init__(self, *args, **kwargs)

    @mock.patch.object(utils, 'require_driver_initialized')

**** CubicPower OpenStack Study ****

    def test_initialize_connection_voltype_fc_mode_fabric(self,

                                                          utils_mock):

        utils_mock.return_value = True

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            self.initialize_connection(self.context_mock, None, None)

            add_del_conn_mock.assert_called_once_with(conn_info, 1)

    @mock.patch.object(utils, 'require_driver_initialized')

**** CubicPower OpenStack Study ****

    def test_initialize_connection_voltype_fc_mode_none(self,

                                                        utils_mock):

        utils_mock.return_value = True

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            self.configuration.zoning_mode = 'none'

            self.zonemanager = None

            self.initialize_connection(self.context_mock, None, None)

            assert not add_del_conn_mock.called

**** CubicPower OpenStack Study ****

    def test_terminate_connection_exception(self):

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            add_del_conn_mock.side_effect = exception.ZoneManagerException

            self.assertRaises(exception.VolumeBackendAPIException,

                              self.terminate_connection, None, None, None,

                              False)

    @mock.patch.object(utils, 'require_driver_initialized')

**** CubicPower OpenStack Study ****

    def test_terminate_connection_voltype_fc_mode_fabric(self,

                                                         utils_mock):

        utils_mock.return_value = True

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            self.terminate_connection(self.context_mock, None, None, False)

            add_del_conn_mock.assert_called_once_with(conn_info, 0)

    @mock.patch.object(utils, 'require_driver_initialized')

**** CubicPower OpenStack Study ****

    def test_terminate_connection_mode_none(self,

                                            utils_mock):

        utils_mock.return_value = True

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            self.configuration.zoning_mode = 'none'

            self.zonemanager = None

            self.terminate_connection(self.context_mock, None, None, False)

            assert not add_del_conn_mock.called

    @mock.patch.object(utils, 'require_driver_initialized')

**** CubicPower OpenStack Study ****

    def test_terminate_connection_conn_info_none(self,

                                                 utils_mock):

        utils_mock.return_value = True

        self.driver.terminate_connection.return_value = None

        with mock.patch.object(manager.VolumeManager,

                               '_add_or_delete_fc_connection')\

                as add_del_conn_mock:

            self.terminate_connection(self.context_mock, None, None, False)

            assert not add_del_conn_mock.called

    @mock.patch.object(fc_zone_manager.ZoneManager, 'add_connection')

**** CubicPower OpenStack Study ****

    def test__add_or_delete_connection_add(self,

                                           add_connection_mock):

        self._add_or_delete_fc_connection(conn_info, 1)

        add_connection_mock.assert_called_once_with(init_target_map)

    @mock.patch.object(fc_zone_manager.ZoneManager, 'delete_connection')

**** CubicPower OpenStack Study ****

    def test__add_or_delete_connection_delete(self,

                                              delete_connection_mock):

        self._add_or_delete_fc_connection(conn_info, 0)

        delete_connection_mock.assert_called_once_with(init_target_map)

    @mock.patch.object(fc_zone_manager.ZoneManager, 'delete_connection')

**** CubicPower OpenStack Study ****

    def test__add_or_delete_connection_no_init_target_map(self,

                                                          del_conn_mock):

        self._add_or_delete_fc_connection(conn_info_no_init_target_map, 0)

        assert not del_conn_mock.called