**** CubicPower OpenStack Study ****
# Copyright (c) 2013 Scality
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
"""
Unit tests for the Scality SOFS Volume Driver.
"""
import errno
import os
import shutil
import tempfile
import mox as mox_lib
from cinder import context
from cinder import exception
from cinder.image import image_utils
from cinder import test
from cinder import units
from cinder import utils
from cinder.volume.drivers import scality
**** CubicPower OpenStack Study ****
class ScalityDriverTestCase(test.TestCase):
    """Test case for the Scality driver."""
    TEST_MOUNT = '/tmp/fake_mount'
    TEST_CONFIG = '/tmp/fake_config'
    TEST_VOLDIR = 'volumes'
    TEST_VOLNAME = 'volume_name'
    TEST_VOLSIZE = '0'
    TEST_VOLUME = {
        'name': TEST_VOLNAME,
        'size': TEST_VOLSIZE
    }
    TEST_VOLPATH = os.path.join(TEST_MOUNT,
                                TEST_VOLDIR,
                                TEST_VOLNAME)
    TEST_SNAPNAME = 'snapshot_name'
    TEST_SNAPSHOT = {
        'name': TEST_SNAPNAME,
        'volume_name': TEST_VOLNAME,
        'volume_size': TEST_VOLSIZE
    }
    TEST_SNAPPATH = os.path.join(TEST_MOUNT,
                                 TEST_VOLDIR,
                                 TEST_SNAPNAME)
    TEST_CLONENAME = 'clone_name'
    TEST_CLONE = {
        'name': TEST_CLONENAME,
        'size': TEST_VOLSIZE
    }
    TEST_NEWSIZE = '2'
    TEST_IMAGE_SERVICE = 'image_service'
    TEST_IMAGE_ID = 'image_id'
    TEST_IMAGE_META = 'image_meta'
    
**** CubicPower OpenStack Study ****
    def _makedirs(self, path):
        try:
            os.makedirs(path)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise
**** CubicPower OpenStack Study ****
    def _create_fake_config(self):
        open(self.TEST_CONFIG, "w+").close()
**** CubicPower OpenStack Study ****
    def _create_fake_mount(self):
        self._makedirs(os.path.join(self.TEST_MOUNT, 'sys'))
        self._makedirs(os.path.join(self.TEST_MOUNT, self.TEST_VOLDIR))
**** CubicPower OpenStack Study ****
    def _remove_fake_config(self):
        try:
            os.unlink(self.TEST_CONFIG)
        except OSError as e:
            if e.errno != errno.ENOENT:
                raise
**** CubicPower OpenStack Study ****
    def _configure_driver(self):
        scality.CONF.scality_sofs_config = self.TEST_CONFIG
        scality.CONF.scality_sofs_mount_point = self.TEST_MOUNT
        scality.CONF.scality_sofs_volume_dir = self.TEST_VOLDIR
        scality.CONF.volume_dd_blocksize = '1M'
**** CubicPower OpenStack Study ****
    def _execute_wrapper(self, cmd, *args, **kwargs):
        try:
            kwargs.pop('run_as_root')
        except KeyError:
            pass
        utils.execute(cmd, *args, **kwargs)
**** CubicPower OpenStack Study ****
    def _set_access_wrapper(self, is_visible):
        def _access_wrapper(path, flags):
            if path == '/sbin/mount.sofs':
                return is_visible
            else:
                return os.access(path, flags)
        self.stubs.Set(os, 'access', _access_wrapper)
**** CubicPower OpenStack Study ****
        def _access_wrapper(path, flags):
            if path == '/sbin/mount.sofs':
                return is_visible
            else:
                return os.access(path, flags)
        self.stubs.Set(os, 'access', _access_wrapper)
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(ScalityDriverTestCase, self).setUp()
        self.tempdir = tempfile.mkdtemp()
        self.TEST_MOUNT = self.tempdir
        self.TEST_VOLPATH = os.path.join(self.TEST_MOUNT,
                                         self.TEST_VOLDIR,
                                         self.TEST_VOLNAME)
        self.TEST_SNAPPATH = os.path.join(self.TEST_MOUNT,
                                          self.TEST_VOLDIR,
                                          self.TEST_SNAPNAME)
        self.TEST_CLONEPATH = os.path.join(self.TEST_MOUNT,
                                           self.TEST_VOLDIR,
                                           self.TEST_CLONENAME)
        self._driver = scality.ScalityDriver()
        self._driver.set_execute(self._execute_wrapper)
        self._mox = mox_lib.Mox()
        self._create_fake_mount()
        self._create_fake_config()
        self._configure_driver()
**** CubicPower OpenStack Study ****
    def tearDown(self):
        shutil.rmtree(self.tempdir)
        self._remove_fake_config()
        super(ScalityDriverTestCase, self).tearDown()
**** CubicPower OpenStack Study ****
    def test_setup_no_config(self):
        """Missing SOFS configuration shall raise an error."""
        scality.CONF.scality_sofs_config = None
        self.assertRaises(exception.VolumeBackendAPIException,
                          self._driver.do_setup, None)
**** CubicPower OpenStack Study ****
    def test_setup_missing_config(self):
        """Non-existent SOFS configuration file shall raise an error."""
        scality.CONF.scality_sofs_config = 'nonexistent.conf'
        self.assertRaises(exception.VolumeBackendAPIException,
                          self._driver.do_setup, None)
**** CubicPower OpenStack Study ****
    def test_setup_no_mount_helper(self):
        """SOFS must be installed to use the driver."""
        self._set_access_wrapper(False)
        self.assertRaises(exception.VolumeBackendAPIException,
                          self._driver.do_setup, None)
**** CubicPower OpenStack Study ****
    def test_setup_make_voldir(self):
        """The directory for volumes shall be created automatically."""
        self._set_access_wrapper(True)
        voldir_path = os.path.join(self.TEST_MOUNT, self.TEST_VOLDIR)
        os.rmdir(voldir_path)
        self._driver.do_setup(None)
        self.assertTrue(os.path.isdir(voldir_path))
**** CubicPower OpenStack Study ****
    def test_local_path(self):
        """Expected behaviour for local_path."""
        self.assertEqual(self._driver.local_path(self.TEST_VOLUME),
                         self.TEST_VOLPATH)
**** CubicPower OpenStack Study ****
    def test_create_volume(self):
        """Expected behaviour for create_volume."""
        ret = self._driver.create_volume(self.TEST_VOLUME)
        self.assertEqual(ret['provider_location'],
                         os.path.join(self.TEST_VOLDIR,
                                      self.TEST_VOLNAME))
        self.assertTrue(os.path.isfile(self.TEST_VOLPATH))
        self.assertEqual(os.stat(self.TEST_VOLPATH).st_size,
                         100 * units.MiB)
**** CubicPower OpenStack Study ****
    def test_delete_volume(self):
        """Expected behaviour for delete_volume."""
        self._driver.create_volume(self.TEST_VOLUME)
        self._driver.delete_volume(self.TEST_VOLUME)
        self.assertFalse(os.path.isfile(self.TEST_VOLPATH))
**** CubicPower OpenStack Study ****
    def test_create_snapshot(self):
        """Expected behaviour for create_snapshot."""
        mox = self._mox
        vol_size = self._driver._size_bytes(self.TEST_VOLSIZE)
        mox.StubOutWithMock(self._driver, '_create_file')
        self._driver._create_file(self.TEST_SNAPPATH, vol_size)
        mox.StubOutWithMock(self._driver, '_copy_file')
        self._driver._copy_file(self.TEST_VOLPATH, self.TEST_SNAPPATH)
        mox.ReplayAll()
        self._driver.create_snapshot(self.TEST_SNAPSHOT)
        mox.UnsetStubs()
        mox.VerifyAll()
**** CubicPower OpenStack Study ****
    def test_delete_snapshot(self):
        """Expected behaviour for delete_snapshot."""
        mox = self._mox
        mox.StubOutWithMock(os, 'remove')
        os.remove(self.TEST_SNAPPATH)
        mox.ReplayAll()
        self._driver.delete_snapshot(self.TEST_SNAPSHOT)
        mox.UnsetStubs()
        mox.VerifyAll()
**** CubicPower OpenStack Study ****
    def test_initialize_connection(self):
        """Expected behaviour for initialize_connection."""
        ret = self._driver.initialize_connection(self.TEST_VOLUME, None)
        self.assertEqual(ret['driver_volume_type'], 'scality')
        self.assertEqual(ret['data']['sofs_path'],
                         os.path.join(self.TEST_VOLDIR,
                                      self.TEST_VOLNAME))
**** CubicPower OpenStack Study ****
    def test_copy_image_to_volume(self):
        """Expected behaviour for copy_image_to_volume."""
        self.mox.StubOutWithMock(image_utils, 'fetch_to_raw')
        image_utils.fetch_to_raw(context,
                                 self.TEST_IMAGE_SERVICE,
                                 self.TEST_IMAGE_ID,
                                 self.TEST_VOLPATH,
                                 mox_lib.IgnoreArg(),
                                 size=self.TEST_VOLSIZE)
        self.mox.ReplayAll()
        self._driver.copy_image_to_volume(context,
                                          self.TEST_VOLUME,
                                          self.TEST_IMAGE_SERVICE,
                                          self.TEST_IMAGE_ID)
**** CubicPower OpenStack Study ****
    def test_copy_volume_to_image(self):
        """Expected behaviour for copy_volume_to_image."""
        self.mox.StubOutWithMock(image_utils, 'upload_volume')
        image_utils.upload_volume(context,
                                  self.TEST_IMAGE_SERVICE,
                                  self.TEST_IMAGE_META,
                                  self.TEST_VOLPATH)
        self.mox.ReplayAll()
        self._driver.copy_volume_to_image(context,
                                          self.TEST_VOLUME,
                                          self.TEST_IMAGE_SERVICE,
                                          self.TEST_IMAGE_META)
**** CubicPower OpenStack Study ****
    def test_create_cloned_volume(self):
        """Expected behaviour for create_cloned_volume."""
        self.mox.StubOutWithMock(self._driver, '_create_file')
        self.mox.StubOutWithMock(self._driver, '_copy_file')
        vol_size = self._driver._size_bytes(self.TEST_VOLSIZE)
        self._driver._create_file(self.TEST_CLONEPATH, vol_size)
        self._driver._copy_file(self.TEST_VOLPATH, self.TEST_CLONEPATH)
        self.mox.ReplayAll()
        self._driver.create_cloned_volume(self.TEST_CLONE, self.TEST_VOLUME)
**** CubicPower OpenStack Study ****
    def test_extend_volume(self):
        """Expected behaviour for extend_volume."""
        self.mox.StubOutWithMock(self._driver, '_create_file')
        new_size = self._driver._size_bytes(self.TEST_NEWSIZE)
        self._driver._create_file(self.TEST_VOLPATH, new_size)
        self.mox.ReplayAll()
        self._driver.extend_volume(self.TEST_VOLUME, self.TEST_NEWSIZE)