¡@

Home 

OpenStack Study: test_windows.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 Pedro Navarro Perez

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""

Unit tests for Windows Server 2012 OpenStack Cinder volume driver

"""

import os

import shutil

import tempfile

from oslo.config import cfg

import mox as mox_lib

from mox import IgnoreArg

from mox import stubout

from cinder import test

from cinder.image import image_utils

from cinder.tests.windows import db_fakes

from cinder.volume import configuration as conf

from cinder.volume.drivers.windows import windows

from cinder.volume.drivers.windows import windows_utils

CONF = cfg.CONF

**** CubicPower OpenStack Study ****

class TestWindowsDriver(test.TestCase):

**** CubicPower OpenStack Study ****

    def __init__(self, method):

        super(TestWindowsDriver, self).__init__(method)

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.lun_path_tempdir = tempfile.mkdtemp()

        super(TestWindowsDriver, self).setUp()

        self._mox = mox_lib.Mox()

        self.stubs = stubout.StubOutForTesting()

        self.flags(

            windows_iscsi_lun_path=self.lun_path_tempdir,

        )

        self._setup_stubs()

        configuration = conf.Configuration(None)

        configuration.append_config_values(windows.windows_opts)

        self._driver = windows.WindowsDriver(configuration=configuration)

        self._driver.do_setup({})

**** CubicPower OpenStack Study ****

    def tearDown(self):

        self._mox.UnsetStubs()

        self.stubs.UnsetAll()

        shutil.rmtree(self.lun_path_tempdir)

        super(TestWindowsDriver, self).tearDown()

**** CubicPower OpenStack Study ****

    def _setup_stubs(self):

        def fake_wutils__init__(self):

            pass

        windows_utils.WindowsUtils.__init__ = fake_wutils__init__

**** CubicPower OpenStack Study ****

        def fake_wutils__init__(self):

            pass

        windows_utils.WindowsUtils.__init__ = fake_wutils__init__

**** CubicPower OpenStack Study ****

    def fake_local_path(self, volume):

            return os.path.join(CONF.windows_iscsi_lun_path,

                                str(volume['name']) + ".vhd")

**** CubicPower OpenStack Study ****

    def test_check_for_setup_errors(self):

        mox = self._mox

        drv = self._driver

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'check_for_setup_error')

        windows_utils.WindowsUtils.check_for_setup_error()

        mox.ReplayAll()

        drv.check_for_setup_error()

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_create_volume(self):

        mox = self._mox

        drv = self._driver

        vol = db_fakes.get_fake_volume_info()

        self.stubs.Set(drv, 'local_path', self.fake_local_path)

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_volume')

        windows_utils.WindowsUtils.create_volume(self.fake_local_path(vol),

                                                 vol['name'], vol['size'])

        mox.ReplayAll()

        drv.create_volume(vol)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_delete_volume(self):

        """delete_volume simple test case."""

        mox = self._mox

        drv = self._driver

        vol = db_fakes.get_fake_volume_info()

        mox.StubOutWithMock(drv, 'local_path')

        drv.local_path(vol).AndReturn(self.fake_local_path(vol))

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'delete_volume')

        windows_utils.WindowsUtils.delete_volume(vol['name'],

                                                 self.fake_local_path(vol))

        mox.ReplayAll()

        drv.delete_volume(vol)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_create_snapshot(self):

        mox = self._mox

        drv = self._driver

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_snapshot')

        volume = db_fakes.get_fake_volume_info()

        snapshot = db_fakes.get_fake_snapshot_info()

        self.stubs.Set(drv, 'local_path', self.fake_local_path(snapshot))

        windows_utils.WindowsUtils.create_snapshot(volume['name'],

                                                   snapshot['name'])

        mox.ReplayAll()

        drv.create_snapshot(snapshot)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_create_volume_from_snapshot(self):

        mox = self._mox

        drv = self._driver

        snapshot = db_fakes.get_fake_snapshot_info()

        volume = db_fakes.get_fake_volume_info()

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_volume_from_snapshot')

        windows_utils.WindowsUtils.\

            create_volume_from_snapshot(volume['name'], snapshot['name'])

        mox.ReplayAll()

        drv.create_volume_from_snapshot(volume, snapshot)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_delete_snapshot(self):

        mox = self._mox

        drv = self._driver

        snapshot = db_fakes.get_fake_snapshot_info()

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'delete_snapshot')

        windows_utils.WindowsUtils.delete_snapshot(snapshot['name'])

        mox.ReplayAll()

        drv.delete_snapshot(snapshot)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_create_export(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_iscsi_target')

        windows_utils.WindowsUtils.create_iscsi_target(initiator_name,

                                                       IgnoreArg())

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'add_disk_to_target')

        windows_utils.WindowsUtils.add_disk_to_target(volume['name'],

                                                      initiator_name)

        mox.ReplayAll()

        export_info = drv.create_export(None, volume)

        mox.VerifyAll()

        self.assertEqual(export_info['provider_location'], initiator_name)

**** CubicPower OpenStack Study ****

    def test_initialize_connection(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])

        connector = db_fakes.get_fake_connector_info()

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'associate_initiator_with_iscsi_target')

        windows_utils.WindowsUtils.associate_initiator_with_iscsi_target(

            volume['provider_location'], initiator_name, )

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'get_host_information')

        windows_utils.WindowsUtils.get_host_information(

            volume, volume['provider_location'])

        mox.ReplayAll()

        drv.initialize_connection(volume, connector)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_terminate_connection(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])

        connector = db_fakes.get_fake_connector_info()

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'delete_iscsi_target')

        windows_utils.WindowsUtils.delete_iscsi_target(

            initiator_name, volume['provider_location'])

        mox.ReplayAll()

        drv.terminate_connection(volume, connector)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_ensure_export(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        initiator_name = "%s%s" % (CONF.iscsi_target_prefix, volume['name'])

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_iscsi_target')

        windows_utils.WindowsUtils.create_iscsi_target(initiator_name, True)

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'add_disk_to_target')

        windows_utils.WindowsUtils.add_disk_to_target(volume['name'],

                                                      initiator_name)

        mox.ReplayAll()

        drv.ensure_export(None, volume)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_remove_export(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        target_name = volume['provider_location']

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'remove_iscsi_target')

        windows_utils.WindowsUtils.remove_iscsi_target(target_name)

        mox.ReplayAll()

        drv.remove_export(None, volume)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_copy_image_to_volume(self):

        """resize_image common case usage."""

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        self.stubs.Set(drv, 'local_path', self.fake_local_path)

        mox.StubOutWithMock(image_utils, 'fetch_to_vhd')

        image_utils.fetch_to_vhd(None, None, None,

                                 self.fake_local_path(volume),

                                 mox_lib.IgnoreArg())

        mox.ReplayAll()

        drv.copy_image_to_volume(None, volume, None, None)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_copy_volume_to_image(self):

        mox = self._mox

        drv = self._driver

        vol = db_fakes.get_fake_volume_info()

        image_meta = db_fakes.get_fake_image_meta()

        self.stubs.Set(drv, 'local_path', self.fake_local_path)

        mox.StubOutWithMock(image_utils, 'upload_volume')

        temp_vhd_path = os.path.join(CONF.image_conversion_dir,

                                     str(image_meta['id']) + ".vhd")

        image_utils.upload_volume(None, None, image_meta, temp_vhd_path, 'vpc')

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'copy_vhd_disk')

        windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(vol),

                                                 temp_vhd_path)

        mox.ReplayAll()

        drv.copy_volume_to_image(None, vol, None, image_meta)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_create_cloned_volume(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        volume_cloned = db_fakes.get_fake_volume_info_cloned()

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'create_volume')

        windows_utils.WindowsUtils.create_volume(IgnoreArg(), IgnoreArg(),

                                                 IgnoreArg())

        self._mox.StubOutWithMock(windows_utils.WindowsUtils,

                                  'copy_vhd_disk')

        windows_utils.WindowsUtils.copy_vhd_disk(self.fake_local_path(

            volume_cloned), self.fake_local_path(volume))

        mox.ReplayAll()

        drv.create_cloned_volume(volume, volume_cloned)

        mox.VerifyAll()

**** CubicPower OpenStack Study ****

    def test_extend_volume(self):

        mox = self._mox

        drv = self._driver

        volume = db_fakes.get_fake_volume_info()

        TEST_VOLUME_ADDITIONAL_SIZE_MB = 1024

        TEST_VOLUME_ADDITIONAL_SIZE_GB = 1

        self._mox.StubOutWithMock(windows_utils.WindowsUtils, 'extend')

        windows_utils.WindowsUtils.extend(volume['name'],

                                          TEST_VOLUME_ADDITIONAL_SIZE_MB)

        new_size = volume['size'] + TEST_VOLUME_ADDITIONAL_SIZE_GB

        mox.ReplayAll()

        drv.extend_volume(volume, new_size)

        mox.VerifyAll()