¡@

Home 

OpenStack Study: test_router.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2011 Denali Systems, Inc.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

from cinder.api.openstack import wsgi

from cinder.api.v1 import router

from cinder.api.v1 import snapshots

from cinder.api.v1 import volumes

from cinder.api import versions

from cinder.openstack.common import log as logging

from cinder import test

from cinder.tests.api import fakes

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class FakeController(object):

**** CubicPower OpenStack Study ****

    def __init__(self, ext_mgr=None):

        self.ext_mgr = ext_mgr

**** CubicPower OpenStack Study ****

    def index(self, req):

        return {}

**** CubicPower OpenStack Study ****

    def detail(self, req):

        return {}

def create_resource(ext_mgr):

    return wsgi.Resource(FakeController(ext_mgr))

**** CubicPower OpenStack Study ****

def create_resource(ext_mgr):

    return wsgi.Resource(FakeController(ext_mgr))

**** CubicPower OpenStack Study ****

class VolumeRouterTestCase(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(VolumeRouterTestCase, self).setUp()

        # NOTE(vish): versions is just returning text so, no need to stub.

        self.stubs.Set(snapshots, 'create_resource', create_resource)

        self.stubs.Set(volumes, 'create_resource', create_resource)

        self.app = router.APIRouter()

**** CubicPower OpenStack Study ****

    def test_versions(self):

        req = fakes.HTTPRequest.blank('')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(302, response.status_int)

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_versions_action_args_index(self):

        request_environment = {'PATH_INFO': '/'}

        resource = versions.Versions()

        result = resource.get_action_args(request_environment)

        self.assertEqual(result['action'], 'index')

**** CubicPower OpenStack Study ****

    def test_versions_action_args_multi(self):

        request_environment = {'PATH_INFO': '/fake/path'}

        resource = versions.Versions()

        result = resource.get_action_args(request_environment)

        self.assertEqual(result['action'], 'multi')

**** CubicPower OpenStack Study ****

    def test_versions_get_most_recent_update(self):

        res = versions.AtomSerializer()

        fake_date_updated = [

            {"updated": '2012-01-04T11:33:21Z'},

            {"updated": '2012-11-21T11:33:21Z'}

        ]

        result = res._get_most_recent_update(fake_date_updated)

        self.assertEqual('2012-11-21T11:33:21Z', result)

**** CubicPower OpenStack Study ****

    def test_versions_create_version_entry(self):

        res = versions.AtomSerializer()

        vers = {

            "id": "v2.0",

            "status": "CURRENT",

            "updated": "2012-11-21T11:33:21Z",

            "links": [

                {

                    "rel": "describedby",

                    "type": "application/pdf",

                    "href": "http://jorgew.github.com/block-storage-api/"

                            "content/os-block-storage-1.0.pdf",

                },

            ],

        }

        fake_result = {

            'id': 'http://jorgew.github.com/block-storage-api/'

                  'content/os-block-storage-1.0.pdf',

            'title': 'Version v2.0',

            'updated': '2012-11-21T11:33:21Z',

            'link': {

                'href': 'http://jorgew.github.com/block-storage-api/'

                        'content/os-block-storage-1.0.pdf',

                'type': 'application/pdf',

                'rel': 'describedby'

            },

            'content': 'Version v2.0 CURRENT (2012-11-21T11:33:21Z)'

        }

        result_function = res._create_version_entry(vers)

        result = {}

        for subElement in result_function:

            if subElement.text:

                result[subElement.tag] = subElement.text

            else:

                result[subElement.tag] = subElement.attrib

        self.assertEqual(result, fake_result)

**** CubicPower OpenStack Study ****

    def test_versions_create_feed(self):

        res = versions.AtomSerializer()

        vers = [

            {

                "id": "v2.0",

                "status": "CURRENT",

                "updated": "2012-11-21T11:33:21Z",

                "links": [

                    {

                        "rel": "describedby",

                        "type": "application/pdf",

                        "href": "http://jorgew.github.com/block-storage-api/"

                                "content/os-block-storage-1.0.pdf",

                    },

                ],

            },

            {

                "id": "v1.0",

                "status": "CURRENT",

                "updated": "2012-01-04T11:33:21Z",

                "links": [

                    {

                        "rel": "describedby",

                        "type": "application/vnd.sun.wadl+xml",

                        "href": "http://docs.rackspacecloud.com/"

                                "servers/api/v1.1/application.wadl",

                    },

                ],

            }

        ]

        result = res._create_feed(vers, "fake_feed_title",

                                  "http://jorgew.github.com/block-storage-api/"

                                  "content/os-block-storage-1.0.pdf")

        fake_data = {

            'id': 'http://jorgew.github.com/block-storage-api/'

                  'content/os-block-storage-1.0.pdf',

            'title': 'fake_feed_title',

            'updated': '2012-11-21T11:33:21Z',

        }

        data = {}

        for subElement in result:

            if subElement.text:

                data[subElement.tag] = subElement.text

        self.assertEqual(data, fake_data)

**** CubicPower OpenStack Study ****

    def test_versions_multi(self):

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.multi, req, {})

        ids = [v['id'] for v in result['choices']]

        self.assertEqual(set(ids), set(['v1.0', 'v2.0']))

**** CubicPower OpenStack Study ****

    def test_versions_multi_disable_v1(self):

        self.flags(enable_v1_api=False)

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.multi, req, {})

        ids = [v['id'] for v in result['choices']]

        self.assertEqual(set(ids), set(['v2.0']))

**** CubicPower OpenStack Study ****

    def test_versions_multi_disable_v2(self):

        self.flags(enable_v2_api=False)

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.multi, req, {})

        ids = [v['id'] for v in result['choices']]

        self.assertEqual(set(ids), set(['v1.0']))

**** CubicPower OpenStack Study ****

    def test_versions_index(self):

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.index, req, {})

        ids = [v['id'] for v in result['versions']]

        self.assertEqual(set(ids), set(['v1.0', 'v2.0']))

**** CubicPower OpenStack Study ****

    def test_versions_index_disable_v1(self):

        self.flags(enable_v1_api=False)

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.index, req, {})

        ids = [v['id'] for v in result['versions']]

        self.assertEqual(set(ids), set(['v2.0']))

**** CubicPower OpenStack Study ****

    def test_versions_index_disable_v2(self):

        self.flags(enable_v2_api=False)

        req = fakes.HTTPRequest.blank('/')

        req.method = 'GET'

        req.content_type = 'application/json'

        resource = versions.Versions()

        result = resource.dispatch(resource.index, req, {})

        ids = [v['id'] for v in result['versions']]

        self.assertEqual(set(ids), set(['v1.0']))

**** CubicPower OpenStack Study ****

    def test_volumes(self):

        req = fakes.HTTPRequest.blank('/fake/volumes')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_volumes_detail(self):

        req = fakes.HTTPRequest.blank('/fake/volumes/detail')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_types(self):

        req = fakes.HTTPRequest.blank('/fake/types')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_snapshots(self):

        req = fakes.HTTPRequest.blank('/fake/snapshots')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_snapshots_detail(self):

        req = fakes.HTTPRequest.blank('/fake/snapshots/detail')

        req.method = 'GET'

        req.content_type = 'application/json'

        response = req.get_response(self.app)

        self.assertEqual(200, response.status_int)