¡@

Home 

OpenStack Study: test_admin_actions.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import ast

import tempfile

import webob

from oslo.config import cfg

from cinder.brick.local_dev import lvm as brick_lvm

from cinder import context

from cinder import db

from cinder import exception

from cinder.openstack.common import jsonutils

from cinder.openstack.common import timeutils

from cinder import test

from cinder.tests.api import fakes

from cinder.tests.api.v2 import stubs

from cinder.tests import cast_as_call

from cinder.volume import api as volume_api

CONF = cfg.CONF

**** CubicPower OpenStack Study ****

def app():

    # no auth, just let environ['cinder.context'] pass through

    api = fakes.router.APIRouter()

    mapper = fakes.urlmap.URLMap()

    mapper['/v2'] = api

    return mapper

**** CubicPower OpenStack Study ****

class AdminActionsTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(AdminActionsTest, self).setUp()

        self.tempdir = tempfile.mkdtemp()

        self.flags(rpc_backend='cinder.openstack.common.rpc.impl_fake')

        self.flags(lock_path=self.tempdir,

                   disable_process_locking=True)

        self.volume_api = volume_api.API()

        cast_as_call.mock_cast_as_call(self.volume_api.volume_rpcapi.client)

        cast_as_call.mock_cast_as_call(self.volume_api.scheduler_rpcapi.client)

        self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)

**** CubicPower OpenStack Study ****

    def test_reset_status_as_admin(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available'})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request status of 'error'

        req.body = jsonutils.dumps({'os-reset_status': {'status': 'error'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        volume = db.volume_get(ctx, volume['id'])

        # status changed to 'error'

        self.assertEqual(volume['status'], 'error')

**** CubicPower OpenStack Study ****

    def test_reset_status_as_non_admin(self):

        # current status is 'error'

        volume = db.volume_create(context.get_admin_context(),

                                  {'status': 'error', 'size': 1})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request changing status to available

        req.body = jsonutils.dumps({'os-reset_status': {'status':

                                                        'available'}})

        # non-admin context

        req.environ['cinder.context'] = context.RequestContext('fake', 'fake')

        resp = req.get_response(app())

        # request is not authorized

        self.assertEqual(resp.status_int, 403)

        volume = db.volume_get(context.get_admin_context(), volume['id'])

        # status is still 'error'

        self.assertEqual(volume['status'], 'error')

**** CubicPower OpenStack Study ****

    def test_malformed_reset_status_body(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'size': 1})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # malformed request body

        req.body = jsonutils.dumps({'os-reset_status': {'x-status': 'bad'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # bad request

        self.assertEqual(resp.status_int, 400)

        volume = db.volume_get(ctx, volume['id'])

        # status is still 'available'

        self.assertEqual(volume['status'], 'available')

**** CubicPower OpenStack Study ****

    def test_invalid_status_for_volume(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'size': 1})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # 'invalid' is not a valid status

        req.body = jsonutils.dumps({'os-reset_status': {'status': 'invalid'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # bad request

        self.assertEqual(resp.status_int, 400)

        volume = db.volume_get(ctx, volume['id'])

        # status is still 'available'

        self.assertEqual(volume['status'], 'available')

**** CubicPower OpenStack Study ****

    def test_reset_status_for_missing_volume(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # missing-volume-id

        req = webob.Request.blank('/v2/fake/volumes/%s/action' %

                                  'missing-volume-id')

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # malformed request body

        req.body = jsonutils.dumps({'os-reset_status': {'status':

                                                        'available'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # not found

        self.assertEqual(resp.status_int, 404)

        self.assertRaises(exception.NotFound, db.volume_get, ctx,

                          'missing-volume-id')

**** CubicPower OpenStack Study ****

    def test_reset_attached_status(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1,

                                        'attach_status': 'attached'})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request update attach_status to detached

        body = {'os-reset_status': {'status': 'available',

                                    'attach_status': 'detached'}}

        req.body = jsonutils.dumps(body)

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        volume = db.volume_get(ctx, volume['id'])

        # attach_status changed to 'detached'

        self.assertEqual(volume['attach_status'], 'detached')

        # status un-modified

        self.assertEqual(volume['status'], 'available')

**** CubicPower OpenStack Study ****

    def test_invalid_reset_attached_status(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1,

                                        'attach_status': 'detached'})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # 'invalid' is not a valid attach_status

        body = {'os-reset_status': {'status': 'available',

                                    'attach_status': 'invalid'}}

        req.body = jsonutils.dumps(body)

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # bad request

        self.assertEqual(resp.status_int, 400)

        volume = db.volume_get(ctx, volume['id'])

        # status and attach_status un-modified

        self.assertEqual(volume['status'], 'available')

        self.assertEqual(volume['attach_status'], 'detached')

**** CubicPower OpenStack Study ****

    def test_snapshot_reset_status(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # snapshot in 'error_deleting'

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        snapshot = db.snapshot_create(ctx, {'status': 'error_deleting',

                                            'volume_id': volume['id']})

        req = webob.Request.blank('/v2/fake/snapshots/%s/action' %

                                  snapshot['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request status of 'error'

        req.body = jsonutils.dumps({'os-reset_status': {'status': 'error'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        snapshot = db.snapshot_get(ctx, snapshot['id'])

        # status changed to 'error'

        self.assertEqual(snapshot['status'], 'error')

**** CubicPower OpenStack Study ****

    def test_invalid_status_for_snapshot(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # snapshot in 'available'

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        snapshot = db.snapshot_create(ctx, {'status': 'available',

                                            'volume_id': volume['id']})

        req = webob.Request.blank('/v2/fake/snapshots/%s/action' %

                                  snapshot['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # 'attaching' is not a valid status for snapshots

        req.body = jsonutils.dumps({'os-reset_status': {'status':

                                                        'attaching'}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 400)

        snapshot = db.snapshot_get(ctx, snapshot['id'])

        # status is still 'available'

        self.assertEqual(snapshot['status'], 'available')

**** CubicPower OpenStack Study ****

    def test_force_delete(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is creating

        volume = db.volume_create(ctx, {'size': 1})

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        req.body = jsonutils.dumps({'os-force_delete': {}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        # volume is deleted

        self.assertRaises(exception.NotFound, db.volume_get, ctx, volume['id'])

**** CubicPower OpenStack Study ****

    def test_force_delete_snapshot(self):

        ctx = context.RequestContext('admin', 'fake', True)

        snapshot = stubs.stub_snapshot(1, host='foo')

        self.stubs.Set(db, 'volume_get', lambda x, y: snapshot)

        self.stubs.Set(db, 'snapshot_get', lambda x, y: snapshot)

        self.stubs.Set(volume_api.API, 'delete_snapshot', lambda *x, **y: True)

        path = '/v2/fake/snapshots/%s/action' % snapshot['id']

        req = webob.Request.blank(path)

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        req.body = jsonutils.dumps({'os-force_delete': {}})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        self.assertEqual(resp.status_int, 202)

**** CubicPower OpenStack Study ****

    def test_force_detach_instance_attached_volume(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        connector = {'initiator': 'iqn.2012-07.org.fake:01'}

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        self.volume_api.reserve_volume(ctx, volume)

        mountpoint = '/dev/vbd'

        self.volume_api.attach(ctx, volume, stubs.FAKE_UUID, None,

                               mountpoint, 'rw')

        # volume is attached

        volume = db.volume_get(ctx, volume['id'])

        self.assertEqual(volume['status'], 'in-use')

        self.assertEqual(volume['instance_uuid'], stubs.FAKE_UUID)

        self.assertIsNone(volume['attached_host'])

        self.assertEqual(volume['mountpoint'], mountpoint)

        self.assertEqual(volume['attach_status'], 'attached')

        admin_metadata = volume['volume_admin_metadata']

        self.assertEqual(len(admin_metadata), 2)

        self.assertEqual(admin_metadata[0]['key'], 'readonly')

        self.assertEqual(admin_metadata[0]['value'], 'False')

        self.assertEqual(admin_metadata[1]['key'], 'attached_mode')

        self.assertEqual(admin_metadata[1]['value'], 'rw')

        conn_info = self.volume_api.initialize_connection(ctx,

                                                          volume, connector)

        self.assertEqual(conn_info['data']['access_mode'], 'rw')

        # build request to force detach

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request status of 'error'

        req.body = jsonutils.dumps({'os-force_detach': None})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        # make request

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        volume = db.volume_get(ctx, volume['id'])

        # status changed to 'available'

        self.assertEqual(volume['status'], 'available')

        self.assertIsNone(volume['instance_uuid'])

        self.assertIsNone(volume['attached_host'])

        self.assertIsNone(volume['mountpoint'])

        self.assertEqual(volume['attach_status'], 'detached')

        admin_metadata = volume['volume_admin_metadata']

        self.assertEqual(len(admin_metadata), 1)

        self.assertEqual(admin_metadata[0]['key'], 'readonly')

        self.assertEqual(admin_metadata[0]['value'], 'False')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_force_detach_host_attached_volume(self):

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        connector = {'initiator': 'iqn.2012-07.org.fake:01'}

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        self.volume_api.reserve_volume(ctx, volume)

        mountpoint = '/dev/vbd'

        host_name = 'fake-host'

        self.volume_api.attach(ctx, volume, None, host_name, mountpoint, 'ro')

        # volume is attached

        volume = db.volume_get(ctx, volume['id'])

        self.assertEqual(volume['status'], 'in-use')

        self.assertIsNone(volume['instance_uuid'])

        self.assertEqual(volume['attached_host'], host_name)

        self.assertEqual(volume['mountpoint'], mountpoint)

        self.assertEqual(volume['attach_status'], 'attached')

        admin_metadata = volume['volume_admin_metadata']

        self.assertEqual(len(admin_metadata), 2)

        self.assertEqual(admin_metadata[0]['key'], 'readonly')

        self.assertEqual(admin_metadata[0]['value'], 'False')

        self.assertEqual(admin_metadata[1]['key'], 'attached_mode')

        self.assertEqual(admin_metadata[1]['value'], 'ro')

        conn_info = self.volume_api.initialize_connection(ctx,

                                                          volume, connector)

        self.assertEqual(conn_info['data']['access_mode'], 'ro')

        # build request to force detach

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        # request status of 'error'

        req.body = jsonutils.dumps({'os-force_detach': None})

        # attach admin context to request

        req.environ['cinder.context'] = ctx

        # make request

        resp = req.get_response(app())

        # request is accepted

        self.assertEqual(resp.status_int, 202)

        volume = db.volume_get(ctx, volume['id'])

        # status changed to 'available'

        self.assertEqual(volume['status'], 'available')

        self.assertIsNone(volume['instance_uuid'])

        self.assertIsNone(volume['attached_host'])

        self.assertIsNone(volume['mountpoint'])

        self.assertEqual(volume['attach_status'], 'detached')

        admin_metadata = volume['volume_admin_metadata']

        self.assertEqual(len(admin_metadata), 1)

        self.assertEqual(admin_metadata[0]['key'], 'readonly')

        self.assertEqual(admin_metadata[0]['value'], 'False')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_attach_in_used_volume_by_instance(self):

        """Test that attaching to an in-use volume fails."""

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        connector = {'initiator': 'iqn.2012-07.org.fake:01'}

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        self.volume_api.reserve_volume(ctx, volume)

        mountpoint = '/dev/vbd'

        self.volume_api.attach(ctx, volume, stubs.FAKE_UUID, None,

                               mountpoint, 'rw')

        conn_info = self.volume_api.initialize_connection(ctx,

                                                          volume, connector)

        self.assertEqual(conn_info['data']['access_mode'], 'rw')

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          fakes.get_fake_uuid(),

                          None,

                          mountpoint,

                          'rw')

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          fakes.get_fake_uuid(),

                          None,

                          mountpoint,

                          'ro')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_attach_in_used_volume_by_host(self):

        """Test that attaching to an in-use volume fails."""

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        connector = {'initiator': 'iqn.2012-07.org.fake:01'}

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        self.volume_api.reserve_volume(ctx, volume)

        mountpoint = '/dev/vbd'

        host_name = 'fake_host'

        self.volume_api.attach(ctx, volume, None, host_name, mountpoint, 'rw')

        conn_info = self.volume_api.initialize_connection(ctx,

                                                          volume, connector)

        conn_info['data']['access_mode'] = 'rw'

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          None,

                          host_name,

                          mountpoint,

                          'rw')

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          None,

                          host_name,

                          mountpoint,

                          'ro')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_invalid_iscsi_connector(self):

        """Test connector without the initiator (required by iscsi driver)."""

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        connector = {}

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        self.assertRaises(exception.VolumeBackendAPIException,

                          self.volume_api.initialize_connection,

                          ctx, volume, connector)

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_attach_attaching_volume_with_different_instance(self):

        """Test that attaching volume reserved for another instance fails."""

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        values = {'status': 'attaching',

                  'instance_uuid': fakes.get_fake_uuid()}

        db.volume_update(ctx, volume['id'], values)

        mountpoint = '/dev/vbd'

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          stubs.FAKE_UUID,

                          None,

                          mountpoint,

                          'rw')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def test_attach_attaching_volume_with_different_mode(self):

        """Test that attaching volume reserved for another mode fails."""

        # admin context

        ctx = context.RequestContext('admin', 'fake', True)

        # current status is available

        volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',

                                        'provider_location': '', 'size': 1})

        # start service to handle rpc messages for attach requests

        svc = self.start_service('volume', host='test')

        values = {'status': 'attaching',

                  'instance_uuid': fakes.get_fake_uuid()}

        db.volume_update(ctx, volume['id'], values)

        db.volume_admin_metadata_update(ctx, volume['id'],

                                        {"attached_mode": 'rw'}, False)

        mountpoint = '/dev/vbd'

        self.assertRaises(exception.InvalidVolume,

                          self.volume_api.attach,

                          ctx,

                          volume,

                          values['instance_uuid'],

                          None,

                          mountpoint,

                          'ro')

        # cleanup

        svc.stop()

**** CubicPower OpenStack Study ****

    def _migrate_volume_prep(self):

        admin_ctx = context.get_admin_context()

        # create volume's current host and the destination host

        db.service_create(admin_ctx,

                          {'host': 'test',

                           'topic': CONF.volume_topic,

                           'created_at': timeutils.utcnow()})

        db.service_create(admin_ctx,

                          {'host': 'test2',

                           'topic': CONF.volume_topic,

                           'created_at': timeutils.utcnow()})

        # current status is available

        volume = db.volume_create(admin_ctx,

                                  {'status': 'available',

                                   'host': 'test',

                                   'provider_location': '',

                                   'attach_status': ''})

        return volume

**** CubicPower OpenStack Study ****

    def _migrate_volume_exec(self, ctx, volume, host, expected_status,

                             force_host_copy=False):

        admin_ctx = context.get_admin_context()

        # build request to migrate to host

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        body = {'os-migrate_volume': {'host': host,

                                      'force_host_copy': force_host_copy}}

        req.body = jsonutils.dumps(body)

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # verify status

        self.assertEqual(resp.status_int, expected_status)

        volume = db.volume_get(admin_ctx, volume['id'])

        return volume

**** CubicPower OpenStack Study ****

    def test_migrate_volume_success(self):

        expected_status = 202

        host = 'test2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        volume = self._migrate_volume_exec(ctx, volume, host, expected_status)

        self.assertEqual(volume['migration_status'], 'starting')

**** CubicPower OpenStack Study ****

    def test_migrate_volume_as_non_admin(self):

        expected_status = 403

        host = 'test2'

        ctx = context.RequestContext('fake', 'fake')

        volume = self._migrate_volume_prep()

        self._migrate_volume_exec(ctx, volume, host, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_without_host_parameter(self):

        expected_status = 400

        host = 'test3'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        # build request to migrate without host

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        body = {'os-migrate_volume': {'host': host,

                                      'force_host_copy': False}}

        req.body = jsonutils.dumps(body)

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        # verify status

        self.assertEqual(resp.status_int, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_host_no_exist(self):

        expected_status = 400

        host = 'test3'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        self._migrate_volume_exec(ctx, volume, host, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_same_host(self):

        expected_status = 400

        host = 'test'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        self._migrate_volume_exec(ctx, volume, host, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_migrating(self):

        expected_status = 400

        host = 'test2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        model_update = {'migration_status': 'migrating'}

        volume = db.volume_update(ctx, volume['id'], model_update)

        self._migrate_volume_exec(ctx, volume, host, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_with_snap(self):

        expected_status = 400

        host = 'test2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        db.snapshot_create(ctx, {'volume_id': volume['id']})

        self._migrate_volume_exec(ctx, volume, host, expected_status)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_bad_force_host_copy1(self):

        expected_status = 400

        host = 'test2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        self._migrate_volume_exec(ctx, volume, host, expected_status,

                                  force_host_copy='foo')

**** CubicPower OpenStack Study ****

    def test_migrate_volume_bad_force_host_copy2(self):

        expected_status = 400

        host = 'test2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_prep()

        self._migrate_volume_exec(ctx, volume, host, expected_status,

                                  force_host_copy=1)

**** CubicPower OpenStack Study ****

    def _migrate_volume_comp_exec(self, ctx, volume, new_volume, error,

                                  expected_status, expected_id, no_body=False):

        req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])

        req.method = 'POST'

        req.headers['content-type'] = 'application/json'

        body = {'new_volume': new_volume['id'], 'error': error}

        if no_body:

            req.body = jsonutils.dumps({'': body})

        else:

            req.body = jsonutils.dumps({'os-migrate_volume_completion': body})

        req.environ['cinder.context'] = ctx

        resp = req.get_response(app())

        resp_dict = ast.literal_eval(resp.body)

        # verify status

        self.assertEqual(resp.status_int, expected_status)

        if expected_id:

            self.assertEqual(resp_dict['save_volume_id'], expected_id)

        else:

            self.assertNotIn('save_volume_id', resp_dict)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_comp_as_non_admin(self):

        admin_ctx = context.get_admin_context()

        volume = db.volume_create(admin_ctx, {'id': 'fake1'})

        new_volume = db.volume_create(admin_ctx, {'id': 'fake2'})

        expected_status = 403

        expected_id = None

        ctx = context.RequestContext('fake', 'fake')

        volume = self._migrate_volume_comp_exec(ctx, volume, new_volume, False,

                                                expected_status, expected_id)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_comp_no_mig_status(self):

        admin_ctx = context.get_admin_context()

        volume1 = db.volume_create(admin_ctx, {'id': 'fake1',

                                               'migration_status': 'foo'})

        volume2 = db.volume_create(admin_ctx, {'id': 'fake2',

                                               'migration_status': None})

        expected_status = 400

        expected_id = None

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_comp_exec(ctx, volume1, volume2, False,

                                                expected_status, expected_id)

        volume = self._migrate_volume_comp_exec(ctx, volume2, volume1, False,

                                                expected_status, expected_id)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_comp_bad_mig_status(self):

        admin_ctx = context.get_admin_context()

        volume1 = db.volume_create(admin_ctx,

                                   {'id': 'fake1',

                                    'migration_status': 'migrating'})

        volume2 = db.volume_create(admin_ctx,

                                   {'id': 'fake2',

                                    'migration_status': 'target:foo'})

        expected_status = 400

        expected_id = None

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_comp_exec(ctx, volume1, volume2, False,

                                                expected_status, expected_id)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_comp_no_action(self):

        admin_ctx = context.get_admin_context()

        volume = db.volume_create(admin_ctx, {'id': 'fake1'})

        new_volume = db.volume_create(admin_ctx, {'id': 'fake2'})

        expected_status = 400

        expected_id = None

        ctx = context.RequestContext('fake', 'fake')

        self._migrate_volume_comp_exec(ctx, volume, new_volume, False,

                                       expected_status, expected_id, True)

**** CubicPower OpenStack Study ****

    def test_migrate_volume_comp_from_nova(self):

        admin_ctx = context.get_admin_context()

        volume = db.volume_create(admin_ctx,

                                  {'id': 'fake1',

                                   'status': 'in-use',

                                   'host': 'test',

                                   'migration_status': None,

                                   'attach_status': 'attached'})

        new_volume = db.volume_create(admin_ctx,

                                      {'id': 'fake2',

                                       'status': 'available',

                                       'host': 'test',

                                       'migration_status': None,

                                       'attach_status': 'detached'})

        expected_status = 200

        expected_id = 'fake2'

        ctx = context.RequestContext('admin', 'fake', True)

        volume = self._migrate_volume_comp_exec(ctx, volume, new_volume, False,

                                                expected_status, expected_id)