¡@

Home 

OpenStack Study: test_volumes.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2011 Justin Santa Barbara

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import testtools

import time

from cinder.openstack.common import log as logging

from cinder import service

from cinder.tests import fake_driver

from cinder.tests.integrated.api import client

from cinder.tests.integrated import integrated_helpers

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class VolumesTest(integrated_helpers._IntegratedTestBase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(VolumesTest, self).setUp()

        fake_driver.LoggingVolumeDriver.clear_logs()

**** CubicPower OpenStack Study ****

    def _start_api_service(self):

        self.osapi = service.WSGIService("osapi_volume")

        self.osapi.start()

        self.auth_url = 'http://%s:%s/v1' % (self.osapi.host, self.osapi.port)

        LOG.warn(self.auth_url)

**** CubicPower OpenStack Study ****

    def _get_flags(self):

        f = super(VolumesTest, self)._get_flags()

        f['volume_driver'] = 'cinder.tests.fake_driver.LoggingVolumeDriver'

        return f

**** CubicPower OpenStack Study ****

    def test_get_volumes_summary(self):

        """Simple check that listing volumes works."""

        volumes = self.api.get_volumes(False)

        for volume in volumes:

            LOG.debug("volume: %s" % volume)

**** CubicPower OpenStack Study ****

    def test_get_volumes(self):

        """Simple check that listing volumes works."""

        volumes = self.api.get_volumes()

        for volume in volumes:

            LOG.debug("volume: %s" % volume)

**** CubicPower OpenStack Study ****

    def _poll_while(self, volume_id, continue_states, max_retries=5):

        """Poll (briefly) while the state is in continue_states."""

        retries = 0

        while True:

            try:

                found_volume = self.api.get_volume(volume_id)

            except client.OpenStackApiNotFoundException:

                found_volume = None

                LOG.debug("Got 404, proceeding")

                break

            LOG.debug("Found %s" % found_volume)

            self.assertEqual(volume_id, found_volume['id'])

            if found_volume['status'] not in continue_states:

                break

            time.sleep(1)

            retries = retries + 1

            if retries > max_retries:

                break

        return found_volume

    @testtools.skip('This test is failing: bug 1173266')

**** CubicPower OpenStack Study ****

    def test_create_and_delete_volume(self):

        """Creates and deletes a volume."""

        # Create volume

        created_volume = self.api.post_volume({'volume': {'size': 1}})

        LOG.debug("created_volume: %s" % created_volume)

        self.assertTrue(created_volume['id'])

        created_volume_id = created_volume['id']

        # Check it's there

        found_volume = self.api.get_volume(created_volume_id)

        self.assertEqual(created_volume_id, found_volume['id'])

        # It should also be in the all-volume list

        volumes = self.api.get_volumes()

        volume_names = [volume['id'] for volume in volumes]

        self.assertIn(created_volume_id, volume_names)

        # Wait (briefly) for creation. Delay is due to the 'message queue'

        found_volume = self._poll_while(created_volume_id, ['creating'])

        # It should be available...

        self.assertEqual('available', found_volume['status'])

        # Delete the volume

        self.api.delete_volume(created_volume_id)

        # Wait (briefly) for deletion. Delay is due to the 'message queue'

        found_volume = self._poll_while(created_volume_id, ['deleting'])

        # Should be gone

        self.assertFalse(found_volume)

        LOG.debug("Logs: %s" % fake_driver.LoggingVolumeDriver.all_logs())

        create_actions = fake_driver.LoggingVolumeDriver.logs_like(

            'create_volume',

            id=created_volume_id)

        LOG.debug("Create_Actions: %s" % create_actions)

        self.assertEqual(1, len(create_actions))

        create_action = create_actions[0]

        self.assertEqual(create_action['id'], created_volume_id)

        self.assertEqual(create_action['availability_zone'], 'nova')

        self.assertEqual(create_action['size'], 1)

        export_actions = fake_driver.LoggingVolumeDriver.logs_like(

            'create_export',

            id=created_volume_id)

        self.assertEqual(1, len(export_actions))

        export_action = export_actions[0]

        self.assertEqual(export_action['id'], created_volume_id)

        self.assertEqual(export_action['availability_zone'], 'nova')

        delete_actions = fake_driver.LoggingVolumeDriver.logs_like(

            'delete_volume',

            id=created_volume_id)

        self.assertEqual(1, len(delete_actions))

        delete_action = export_actions[0]

        self.assertEqual(delete_action['id'], created_volume_id)

**** CubicPower OpenStack Study ****

    def test_create_volume_with_metadata(self):

        """Creates a volume with metadata."""

        # Create volume

        metadata = {'key1': 'value1',

                    'key2': 'value2'}

        created_volume = self.api.post_volume(

            {'volume': {'size': 1,

                        'metadata': metadata}})

        LOG.debug("created_volume: %s" % created_volume)

        self.assertTrue(created_volume['id'])

        created_volume_id = created_volume['id']

        # Check it's there and metadata present

        found_volume = self.api.get_volume(created_volume_id)

        self.assertEqual(created_volume_id, found_volume['id'])

        self.assertEqual(metadata, found_volume['metadata'])

**** CubicPower OpenStack Study ****

    def test_create_volume_in_availability_zone(self):

        """Creates a volume in availability_zone."""

        # Create volume

        availability_zone = 'nova'

        created_volume = self.api.post_volume(

            {'volume': {'size': 1,

                        'availability_zone': availability_zone}})

        LOG.debug("created_volume: %s" % created_volume)

        self.assertTrue(created_volume['id'])

        created_volume_id = created_volume['id']

        # Check it's there and availability zone present

        found_volume = self.api.get_volume(created_volume_id)

        self.assertEqual(created_volume_id, found_volume['id'])

        self.assertEqual(availability_zone, found_volume['availability_zone'])

**** CubicPower OpenStack Study ****

    def test_create_and_update_volume(self):

        # Create vol1

        created_volume = self.api.post_volume({'volume': {

            'size': 1, 'display_name': 'vol1'}})

        self.assertEqual(created_volume['display_name'], 'vol1')

        created_volume_id = created_volume['id']

        # update volume

        body = {'volume': {'display_name': 'vol-one'}}

        updated_volume = self.api.put_volume(created_volume_id, body)

        self.assertEqual(updated_volume['display_name'], 'vol-one')

        # check for update

        found_volume = self.api.get_volume(created_volume_id)

        self.assertEqual(created_volume_id, found_volume['id'])

        self.assertEqual(found_volume['display_name'], 'vol-one')