**** CubicPower OpenStack Study ****
# Copyright 2013 Canonical Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Tests for create_volume TaskFlow """
import time
from cinder import context
from cinder import test
from cinder.volume.flows.api import create_volume
**** CubicPower OpenStack Study ****
class fake_scheduler_rpc_api(object):
**** CubicPower OpenStack Study ****
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
**** CubicPower OpenStack Study ****
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
**** CubicPower OpenStack Study ****
class fake_volume_api(object):
**** CubicPower OpenStack Study ****
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
**** CubicPower OpenStack Study ****
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_volid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
**** CubicPower OpenStack Study ****
class fake_db(object):
**** CubicPower OpenStack Study ****
def volume_get(self, *args, **kwargs):
return {'host': 'barf'}
**** CubicPower OpenStack Study ****
def volume_update(self, *args, **kwargs):
return {'host': 'farb'}
**** CubicPower OpenStack Study ****
def snapshot_get(self, *args, **kwargs):
return {'volume_id': 1}
**** CubicPower OpenStack Study ****
class CreateVolumeFlowTestCase(test.TestCase):
**** CubicPower OpenStack Study ****
def time_inc(self):
self.counter += 1
return self.counter
**** CubicPower OpenStack Study ****
def setUp(self):
super(CreateVolumeFlowTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.counter = float(0)
# Ensure that time.time() always returns more than the last time it was
# called to avoid div by zero errors.
self.counter = float(0)
self.stubs.Set(time, 'time', self.time_inc)
**** CubicPower OpenStack Study ****
def test_cast_create_volume(self):
props = {}
spec = {'volume_id': None,
'source_volid': None,
'snapshot_id': None,
'image_id': None}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
task._cast_create_volume(self.ctxt, spec, props)
spec = {'volume_id': 1,
'source_volid': 2,
'snapshot_id': 3,
'image_id': 4}
task = create_volume.VolumeCastTask(
fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
task._cast_create_volume(self.ctxt, spec, props)
**** CubicPower OpenStack Study ****
def tearDown(self):
self.stubs.UnsetAll()
super(CreateVolumeFlowTestCase, self).tearDown()