¡@

Home 

OpenStack Study: test_limits.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2011 OpenStack Foundation

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""

Tests dealing with HTTP rate-limiting.

"""

import httplib

from lxml import etree

import six

import webob

from xml.dom import minidom

from cinder.api.v2 import limits

from cinder.api import views

from cinder.api import xmlutil

import cinder.context

from cinder.openstack.common import jsonutils

from cinder import test

TEST_LIMITS = [

limits.Limit("GET", "/delayed", "^/delayed", 1, limits.PER_MINUTE),

limits.Limit("POST", "*", ".*", 7, limits.PER_MINUTE),

limits.Limit("POST", "/volumes", "^/volumes", 3, limits.PER_MINUTE),

limits.Limit("PUT", "*", "", 10, limits.PER_MINUTE),

limits.Limit("PUT", "/volumes", "^/volumes", 5, limits.PER_MINUTE),

]

NS = {

'atom': 'http://www.w3.org/2005/Atom',

'ns': 'http://docs.openstack.org/common/api/v1.0',

}

**** CubicPower OpenStack Study ****

class BaseLimitTestSuite(test.TestCase):

"""Base test suite which provides relevant stubs and time abstraction."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(BaseLimitTestSuite, self).setUp()

        self.time = 0.0

        self.stubs.Set(limits.Limit, "_get_time", self._get_time)

        self.absolute_limits = {}

        def stub_get_project_quotas(context, project_id, usages=True):

            return dict((k, dict(limit=v))

                        for k, v in self.absolute_limits.items())

        self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",

                       stub_get_project_quotas)

**** CubicPower OpenStack Study ****

        def stub_get_project_quotas(context, project_id, usages=True):

            return dict((k, dict(limit=v))

                        for k, v in self.absolute_limits.items())

        self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",

                       stub_get_project_quotas)

**** CubicPower OpenStack Study ****

    def _get_time(self):

        """Return the "time" according to this test suite."""

        return self.time

**** CubicPower OpenStack Study ****

class LimitsControllerTest(BaseLimitTestSuite):

"""Tests for `limits.LimitsController` class."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Run before each test."""

        super(LimitsControllerTest, self).setUp()

        self.controller = limits.create_resource()

**** CubicPower OpenStack Study ****

    def _get_index_request(self, accept_header="application/json"):

        """Helper to set routing arguments."""

        request = webob.Request.blank("/")

        request.accept = accept_header

        request.environ["wsgiorg.routing_args"] = (None, {

            "action": "index",

            "controller": "",

        })

        context = cinder.context.RequestContext('testuser', 'testproject')

        request.environ["cinder.context"] = context

        return request

**** CubicPower OpenStack Study ****

    def _populate_limits(self, request):

        """Put limit info into a request."""

        _limits = [

            limits.Limit("GET", "*", ".*", 10, 60).display(),

            limits.Limit("POST", "*", ".*", 5, 60 * 60).display(),

            limits.Limit("GET", "changes-since*", "changes-since",

                         5, 60).display(),

        ]

        request.environ["cinder.limits"] = _limits

        return request

**** CubicPower OpenStack Study ****

    def test_empty_index_json(self):

        """Test getting empty limit details in JSON."""

        request = self._get_index_request()

        response = request.get_response(self.controller)

        expected = {

            "limits": {

                "rate": [],

                "absolute": {},

            },

        }

        body = jsonutils.loads(response.body)

        self.assertEqual(expected, body)

**** CubicPower OpenStack Study ****

    def test_index_json(self):

        """Test getting limit details in JSON."""

        request = self._get_index_request()

        request = self._populate_limits(request)

        self.absolute_limits = {

            'gigabytes': 512,

            'volumes': 5,

        }

        response = request.get_response(self.controller)

        expected = {

            "limits": {

                "rate": [

                    {

                        "regex": ".*",

                        "uri": "*",

                        "limit": [

                            {

                                "verb": "GET",

                                "next-available": "1970-01-01T00:00:00Z",

                                "unit": "MINUTE",

                                "value": 10,

                                "remaining": 10,

                            },

                            {

                                "verb": "POST",

                                "next-available": "1970-01-01T00:00:00Z",

                                "unit": "HOUR",

                                "value": 5,

                                "remaining": 5,

                            },

                        ],

                    },

                    {

                        "regex": "changes-since",

                        "uri": "changes-since*",

                        "limit": [

                            {

                                "verb": "GET",

                                "next-available": "1970-01-01T00:00:00Z",

                                "unit": "MINUTE",

                                "value": 5,

                                "remaining": 5,

                            },

                        ],

                    },

                ],

                "absolute": {"maxTotalVolumeGigabytes": 512,

                             "maxTotalVolumes": 5, },

            },

        }

        body = jsonutils.loads(response.body)

        self.assertEqual(expected, body)

**** CubicPower OpenStack Study ****

    def _populate_limits_diff_regex(self, request):

        """Put limit info into a request."""

        _limits = [

            limits.Limit("GET", "*", ".*", 10, 60).display(),

            limits.Limit("GET", "*", "*.*", 10, 60).display(),

        ]

        request.environ["cinder.limits"] = _limits

        return request

**** CubicPower OpenStack Study ****

    def test_index_diff_regex(self):

        """Test getting limit details in JSON."""

        request = self._get_index_request()

        request = self._populate_limits_diff_regex(request)

        response = request.get_response(self.controller)

        expected = {

            "limits": {

                "rate": [

                    {

                        "regex": ".*",

                        "uri": "*",

                        "limit": [

                            {

                                "verb": "GET",

                                "next-available": "1970-01-01T00:00:00Z",

                                "unit": "MINUTE",

                                "value": 10,

                                "remaining": 10,

                            },

                        ],

                    },

                    {

                        "regex": "*.*",

                        "uri": "*",

                        "limit": [

                            {

                                "verb": "GET",

                                "next-available": "1970-01-01T00:00:00Z",

                                "unit": "MINUTE",

                                "value": 10,

                                "remaining": 10,

                            },

                        ],

                    },

                ],

                "absolute": {},

            },

        }

        body = jsonutils.loads(response.body)

        self.assertEqual(expected, body)

**** CubicPower OpenStack Study ****

    def _test_index_absolute_limits_json(self, expected):

        request = self._get_index_request()

        response = request.get_response(self.controller)

        body = jsonutils.loads(response.body)

        self.assertEqual(expected, body['limits']['absolute'])

**** CubicPower OpenStack Study ****

    def test_index_ignores_extra_absolute_limits_json(self):

        self.absolute_limits = {'unknown_limit': 9001}

        self._test_index_absolute_limits_json({})

**** CubicPower OpenStack Study ****

class TestLimiter(limits.Limiter):

pass

**** CubicPower OpenStack Study ****

class LimitMiddlewareTest(BaseLimitTestSuite):

"""Tests for the `limits.RateLimitingMiddleware` class."""

@webob.dec.wsgify

**** CubicPower OpenStack Study ****

    def _empty_app(self, request):

        """Do-nothing WSGI app."""

        pass

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Prepare middleware for use through fake WSGI app."""

        super(LimitMiddlewareTest, self).setUp()

        _limits = '(GET, *, .*, 1, MINUTE)'

        self.app = limits.RateLimitingMiddleware(self._empty_app, _limits,

                                                 "%s.TestLimiter" %

                                                 self.__class__.__module__)

**** CubicPower OpenStack Study ****

    def test_limit_class(self):

        """Test that middleware selected correct limiter class."""

        assert isinstance(self.app._limiter, TestLimiter)

**** CubicPower OpenStack Study ****

    def test_good_request(self):

        """Test successful GET request through middleware."""

        request = webob.Request.blank("/")

        response = request.get_response(self.app)

        self.assertEqual(200, response.status_int)

**** CubicPower OpenStack Study ****

    def test_limited_request_json(self):

        """Test a rate-limited (413) GET request through middleware."""

        request = webob.Request.blank("/")

        response = request.get_response(self.app)

        self.assertEqual(200, response.status_int)

        request = webob.Request.blank("/")

        response = request.get_response(self.app)

        self.assertEqual(response.status_int, 413)

        self.assertIn('Retry-After', response.headers)

        retry_after = int(response.headers['Retry-After'])

        self.assertAlmostEqual(retry_after, 60, 1)

        body = jsonutils.loads(response.body)

        expected = "Only 1 GET request(s) can be made to * every minute."

        value = body["overLimitFault"]["details"].strip()

        self.assertEqual(value, expected)

**** CubicPower OpenStack Study ****

    def test_limited_request_xml(self):

        """Test a rate-limited (413) response as XML."""

        request = webob.Request.blank("/")

        response = request.get_response(self.app)

        self.assertEqual(200, response.status_int)

        request = webob.Request.blank("/")

        request.accept = "application/xml"

        response = request.get_response(self.app)

        self.assertEqual(response.status_int, 413)

        root = minidom.parseString(response.body).childNodes[0]

        expected = "Only 1 GET request(s) can be made to * every minute."

        details = root.getElementsByTagName("details")

        self.assertEqual(details.length, 1)

        value = details.item(0).firstChild.data.strip()

        self.assertEqual(value, expected)

**** CubicPower OpenStack Study ****

class LimitTest(BaseLimitTestSuite):

"""Tests for the `limits.Limit` class."""

**** CubicPower OpenStack Study ****

    def test_GET_no_delay(self):

        """Test a limit handles 1 GET per second."""

        limit = limits.Limit("GET", "*", ".*", 1, 1)

        delay = limit("GET", "/anything")

        self.assertIsNone(delay)

        self.assertEqual(0, limit.next_request)

        self.assertEqual(0, limit.last_request)

**** CubicPower OpenStack Study ****

    def test_GET_delay(self):

        """Test two calls to 1 GET per second limit."""

        limit = limits.Limit("GET", "*", ".*", 1, 1)

        delay = limit("GET", "/anything")

        self.assertIsNone(delay)

        delay = limit("GET", "/anything")

        self.assertEqual(1, delay)

        self.assertEqual(1, limit.next_request)

        self.assertEqual(0, limit.last_request)

        self.time += 4

        delay = limit("GET", "/anything")

        self.assertIsNone(delay)

        self.assertEqual(4, limit.next_request)

        self.assertEqual(4, limit.last_request)

**** CubicPower OpenStack Study ****

class ParseLimitsTest(BaseLimitTestSuite):

"""Tests for the

**** CubicPower OpenStack Study ****

    def test_invalid(self):

        """Test that parse_limits() handles invalid input correctly."""

        self.assertRaises(ValueError, limits.Limiter.parse_limits,

                          ';;;;;')

**** CubicPower OpenStack Study ****

    def test_bad_rule(self):

        """Test that parse_limits() handles bad rules correctly."""

        self.assertRaises(ValueError, limits.Limiter.parse_limits,

                          'GET, *, .*, 20, minute')

**** CubicPower OpenStack Study ****

    def test_missing_arg(self):

        """Test that parse_limits() handles missing args correctly."""

        self.assertRaises(ValueError, limits.Limiter.parse_limits,

                          '(GET, *, .*, 20)')

**** CubicPower OpenStack Study ****

    def test_bad_value(self):

        """Test that parse_limits() handles bad values correctly."""

        self.assertRaises(ValueError, limits.Limiter.parse_limits,

                          '(GET, *, .*, foo, minute)')

**** CubicPower OpenStack Study ****

    def test_bad_unit(self):

        """Test that parse_limits() handles bad units correctly."""

        self.assertRaises(ValueError, limits.Limiter.parse_limits,

                          '(GET, *, .*, 20, lightyears)')

**** CubicPower OpenStack Study ****

    def test_multiple_rules(self):

        """Test that parse_limits() handles multiple rules correctly."""

        try:

            l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'

                                            '(PUT, /foo*, /foo.*, 10, hour);'

                                            '(POST, /bar*, /bar.*, 5, second);'

                                            '(Say, /derp*, /derp.*, 1, day)')

        except ValueError as e:

            assert False, str(e)

        # Make sure the number of returned limits are correct

        self.assertEqual(len(l), 4)

        # Check all the verbs...

        expected = ['GET', 'PUT', 'POST', 'SAY']

        self.assertEqual([t.verb for t in l], expected)

        # ...the URIs...

        expected = ['*', '/foo*', '/bar*', '/derp*']

        self.assertEqual([t.uri for t in l], expected)

        # ...the regexes...

        expected = ['.*', '/foo.*', '/bar.*', '/derp.*']

        self.assertEqual([t.regex for t in l], expected)

        # ...the values...

        expected = [20, 10, 5, 1]

        self.assertEqual([t.value for t in l], expected)

        # ...and the units...

        expected = [limits.PER_MINUTE, limits.PER_HOUR,

                    limits.PER_SECOND, limits.PER_DAY]

        self.assertEqual([t.unit for t in l], expected)

**** CubicPower OpenStack Study ****

class LimiterTest(BaseLimitTestSuite):

"""Tests for the in-memory `limits.Limiter` class."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Run before each test."""

        super(LimiterTest, self).setUp()

        userlimits = {'limits.user3': '',

                      'limits.user0': '(get, *, .*, 4, minute);'

                                      '(put, *, .*, 2, minute)'}

        self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)

**** CubicPower OpenStack Study ****

    def _check(self, num, verb, url, username=None):

        """Check and yield results from checks."""

        for x in xrange(num):

            yield self.limiter.check_for_delay(verb, url, username)[0]

**** CubicPower OpenStack Study ****

    def _check_sum(self, num, verb, url, username=None):

        """Check and sum results from checks."""

        results = self._check(num, verb, url, username)

        return sum(item for item in results if item)

**** CubicPower OpenStack Study ****

    def test_no_delay_GET(self):

        """Ensure no delay on a single call for a limit verb we didn't set."""

        delay = self.limiter.check_for_delay("GET", "/anything")

        self.assertEqual(delay, (None, None))

**** CubicPower OpenStack Study ****

    def test_no_delay_PUT(self):

        """Ensure no delay on a single call for a known limit."""

        delay = self.limiter.check_for_delay("PUT", "/anything")

        self.assertEqual(delay, (None, None))

**** CubicPower OpenStack Study ****

    def test_delay_PUT(self):

        """Test delay on 11th PUT request.

        Ensure the 11th PUT will result in a delay of 6.0 seconds until

        the next request will be granced.

        """

        expected = [None] * 10 + [6.0]

        results = list(self._check(11, "PUT", "/anything"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

    def test_delay_POST(self):

        """Test delay on 8th POST request.

        Ensure the 8th POST will result in a delay of 6.0 seconds until

        the next request will be granced.

        """

        expected = [None] * 7

        results = list(self._check(7, "POST", "/anything"))

        self.assertEqual(expected, results)

        expected = 60.0 / 7.0

        results = self._check_sum(1, "POST", "/anything")

        self.assertAlmostEqual(expected, results, 8)

**** CubicPower OpenStack Study ****

    def test_delay_GET(self):

        """Ensure the 11th GET will result in NO delay."""

        expected = [None] * 11

        results = list(self._check(11, "GET", "/anything"))

        self.assertEqual(expected, results)

        expected = [None] * 4 + [15.0]

        results = list(self._check(5, "GET", "/foo", "user0"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

    def test_delay_PUT_volumes(self):

        """Test delay on /volumes.

        Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere

        is still OK after 5 requests...but then after 11 total requests,

        PUT limiting kicks in.

        """

        # First 6 requests on PUT /volumes

        expected = [None] * 5 + [12.0]

        results = list(self._check(6, "PUT", "/volumes"))

        self.assertEqual(expected, results)

        # Next 5 request on PUT /anything

        expected = [None] * 4 + [6.0]

        results = list(self._check(5, "PUT", "/anything"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

    def test_delay_PUT_wait(self):

        """Test limit is lifted again.

        Ensure after hitting the limit and then waiting for

        the correct amount of time, the limit will be lifted.

        """

        expected = [None] * 10 + [6.0]

        results = list(self._check(11, "PUT", "/anything"))

        self.assertEqual(expected, results)

        # Advance time

        self.time += 6.0

        expected = [None, 6.0]

        results = list(self._check(2, "PUT", "/anything"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

    def test_multiple_delays(self):

        """Ensure multiple requests still get a delay."""

        expected = [None] * 10 + [6.0] * 10

        results = list(self._check(20, "PUT", "/anything"))

        self.assertEqual(expected, results)

        self.time += 1.0

        expected = [5.0] * 10

        results = list(self._check(10, "PUT", "/anything"))

        self.assertEqual(expected, results)

        expected = [None] * 2 + [30.0] * 8

        results = list(self._check(10, "PUT", "/anything", "user0"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

    def test_user_limit(self):

        """Test user-specific limits."""

        self.assertEqual(self.limiter.levels['user3'], [])

        self.assertEqual(len(self.limiter.levels['user0']), 2)

**** CubicPower OpenStack Study ****

    def test_multiple_users(self):

        """Tests involving multiple users."""

        # User0

        expected = [None] * 2 + [30.0] * 8

        results = list(self._check(10, "PUT", "/anything", "user0"))

        self.assertEqual(expected, results)

        # User1

        expected = [None] * 10 + [6.0] * 10

        results = list(self._check(20, "PUT", "/anything", "user1"))

        self.assertEqual(expected, results)

        # User2

        expected = [None] * 10 + [6.0] * 5

        results = list(self._check(15, "PUT", "/anything", "user2"))

        self.assertEqual(expected, results)

        # User3

        expected = [None] * 20

        results = list(self._check(20, "PUT", "/anything", "user3"))

        self.assertEqual(expected, results)

        self.time += 1.0

        # User1 again

        expected = [5.0] * 10

        results = list(self._check(10, "PUT", "/anything", "user1"))

        self.assertEqual(expected, results)

        self.time += 1.0

        # User1 again

        expected = [4.0] * 5

        results = list(self._check(5, "PUT", "/anything", "user2"))

        self.assertEqual(expected, results)

        # User0 again

        expected = [28.0]

        results = list(self._check(1, "PUT", "/anything", "user0"))

        self.assertEqual(expected, results)

        self.time += 28.0

        expected = [None, 30.0]

        results = list(self._check(2, "PUT", "/anything", "user0"))

        self.assertEqual(expected, results)

**** CubicPower OpenStack Study ****

class WsgiLimiterTest(BaseLimitTestSuite):

"""Tests for `limits.WsgiLimiter` class."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Run before each test."""

        super(WsgiLimiterTest, self).setUp()

        self.app = limits.WsgiLimiter(TEST_LIMITS)

**** CubicPower OpenStack Study ****

    def _request_data(self, verb, path):

        """Get data describing a limit request verb/path."""

        return jsonutils.dumps({"verb": verb, "path": path})

**** CubicPower OpenStack Study ****

    def _request(self, verb, url, username=None):

        """Make sure that POSTing to the given url causes the given username

        to perform the given action.  Make the internal rate limiter return

        delay and make sure that the WSGI app returns the correct response.

        """

        if username:

            request = webob.Request.blank("/%s" % username)

        else:

            request = webob.Request.blank("/")

        request.method = "POST"

        request.body = self._request_data(verb, url)

        response = request.get_response(self.app)

        if "X-Wait-Seconds" in response.headers:

            self.assertEqual(response.status_int, 403)

            return response.headers["X-Wait-Seconds"]

        self.assertEqual(response.status_int, 204)

**** CubicPower OpenStack Study ****

    def test_invalid_methods(self):

        """Only POSTs should work."""

        for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:

            request = webob.Request.blank("/", method=method)

            response = request.get_response(self.app)

            self.assertEqual(response.status_int, 405)

**** CubicPower OpenStack Study ****

    def test_good_url(self):

        delay = self._request("GET", "/something")

        self.assertIsNone(delay)

**** CubicPower OpenStack Study ****

    def test_escaping(self):

        delay = self._request("GET", "/something/jump%20up")

        self.assertIsNone(delay)

**** CubicPower OpenStack Study ****

    def test_response_to_delays(self):

        delay = self._request("GET", "/delayed")

        self.assertIsNone(delay)

        delay = self._request("GET", "/delayed")

        self.assertEqual(delay, '60.00')

**** CubicPower OpenStack Study ****

    def test_response_to_delays_usernames(self):

        delay = self._request("GET", "/delayed", "user1")

        self.assertIsNone(delay)

        delay = self._request("GET", "/delayed", "user2")

        self.assertIsNone(delay)

        delay = self._request("GET", "/delayed", "user1")

        self.assertEqual(delay, '60.00')

        delay = self._request("GET", "/delayed", "user2")

        self.assertEqual(delay, '60.00')

**** CubicPower OpenStack Study ****

class FakeHttplibSocket(object):

"""Fake `httplib.HTTPResponse` replacement."""

**** CubicPower OpenStack Study ****

    def __init__(self, response_string):

        """Initialize new `FakeHttplibSocket`."""

        self._buffer = six.StringIO(response_string)

**** CubicPower OpenStack Study ****

    def makefile(self, _mode, _other):

        """Returns the socket's internal buffer."""

        return self._buffer

**** CubicPower OpenStack Study ****

class FakeHttplibConnection(object):

"""Fake `httplib.HTTPConnection`."""

**** CubicPower OpenStack Study ****

    def __init__(self, app, host):

        """Initialize `FakeHttplibConnection`."""

        self.app = app

        self.host = host

**** CubicPower OpenStack Study ****

    def request(self, method, path, body="", headers=None):

        """Fake request handler.

        Requests made via this connection actually get translated and

        routed into our WSGI app, we then wait for the response and turn

        it back into an `httplib.HTTPResponse`.

        """

        if not headers:

            headers = {}

        req = webob.Request.blank(path)

        req.method = method

        req.headers = headers

        req.host = self.host

        req.body = body

        resp = str(req.get_response(self.app))

        resp = "HTTP/1.0 %s" % resp

        sock = FakeHttplibSocket(resp)

        self.http_response = httplib.HTTPResponse(sock)

        self.http_response.begin()

**** CubicPower OpenStack Study ****

    def getresponse(self):

        """Return our generated response from the request."""

        return self.http_response

def wire_HTTPConnection_to_WSGI(host, app):

    """Monkeypatches HTTPConnection so that if you try to connect to host, you

    are instead routed straight to the given WSGI app.

    After calling this method, when any code calls

    httplib.HTTPConnection(host)

    the connection object will be a fake.  Its requests will be sent directly

    to the given WSGI app rather than through a socket.

    Code connecting to hosts other than host will not be affected.

    This method may be called multiple times to map different hosts to

    different apps.

    This method returns the original HTTPConnection object, so that the caller

    can restore the default HTTPConnection interface (for all hosts).

    """

    class HTTPConnectionDecorator(object):

        """Wraps the real HTTPConnection class so that when you instantiate

        the class you might instead get a fake instance.

        """

        def __init__(self, wrapped):

            self.wrapped = wrapped

        def __call__(self, connection_host, *args, **kwargs):

            if connection_host == host:

                return FakeHttplibConnection(app, host)

            else:

                return self.wrapped(connection_host, *args, **kwargs)

    oldHTTPConnection = httplib.HTTPConnection

    httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)

    return oldHTTPConnection

**** CubicPower OpenStack Study ****

        def __init__(self, wrapped):

            self.wrapped = wrapped

**** CubicPower OpenStack Study ****

        def __call__(self, connection_host, *args, **kwargs):

            if connection_host == host:

                return FakeHttplibConnection(app, host)

            else:

                return self.wrapped(connection_host, *args, **kwargs)

    oldHTTPConnection = httplib.HTTPConnection

    httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)

    return oldHTTPConnection

**** CubicPower OpenStack Study ****

class WsgiLimiterProxyTest(BaseLimitTestSuite):

"""Tests for the `limits.WsgiLimiterProxy` class."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        """setUp() for WsgiLimiterProxyTest.

        Do some nifty HTTP/WSGI magic which allows for WSGI to be called

        directly by something like the `httplib` library.

        """

        super(WsgiLimiterProxyTest, self).setUp()

        self.app = limits.WsgiLimiter(TEST_LIMITS)

        oldHTTPConnection = (

            wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))

        self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")

        self.addCleanup(self._restore, oldHTTPConnection)

**** CubicPower OpenStack Study ****

    def _restore(self, oldHTTPConnection):

        # restore original HTTPConnection object

        httplib.HTTPConnection = oldHTTPConnection

**** CubicPower OpenStack Study ****

    def test_200(self):

        """Successful request test."""

        delay = self.proxy.check_for_delay("GET", "/anything")

        self.assertEqual(delay, (None, None))

**** CubicPower OpenStack Study ****

    def test_403(self):

        """Forbidden request test."""

        delay = self.proxy.check_for_delay("GET", "/delayed")

        self.assertEqual(delay, (None, None))

        delay, error = self.proxy.check_for_delay("GET", "/delayed")

        error = error.strip()

        expected = ("60.00", "403 Forbidden\n\nOnly 1 GET request(s) can be "

                    "made to /delayed every minute.")

        self.assertEqual((delay, error), expected)

**** CubicPower OpenStack Study ****

class LimitsViewBuilderTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(LimitsViewBuilderTest, self).setUp()

        self.view_builder = views.limits.ViewBuilder()

        self.rate_limits = [{"URI": "*",

                             "regex": ".*",

                             "value": 10,

                             "verb": "POST",

                             "remaining": 2,

                             "unit": "MINUTE",

                             "resetTime": 1311272226},

                            {"URI": "*/volumes",

                             "regex": "^/volumes",

                             "value": 50,

                             "verb": "POST",

                             "remaining": 10,

                             "unit": "DAY",

                             "resetTime": 1311272226}]

        self.absolute_limits = {"metadata_items": 1,

                                "injected_files": 5,

                                "injected_file_content_bytes": 5}

**** CubicPower OpenStack Study ****

    def test_build_limits(self):

        tdate = "2011-07-21T18:17:06Z"

        expected_limits = {

            "limits": {"rate": [{"uri": "*",

                                 "regex": ".*",

                                 "limit": [{"value": 10,

                                            "verb": "POST",

                                            "remaining": 2,

                                            "unit": "MINUTE",

                                            "next-available": tdate}]},

                                {"uri": "*/volumes",

                                 "regex": "^/volumes",

                                 "limit": [{"value": 50,

                                            "verb": "POST",

                                            "remaining": 10,

                                            "unit": "DAY",

                                            "next-available": tdate}]}],

                       "absolute": {"maxServerMeta": 1,

                                    "maxImageMeta": 1,

                                    "maxPersonality": 5,

                                    "maxPersonalitySize": 5}}}

        output = self.view_builder.build(self.rate_limits,

                                         self.absolute_limits)

        self.assertDictMatch(output, expected_limits)

**** CubicPower OpenStack Study ****

    def test_build_limits_empty_limits(self):

        expected_limits = {"limits": {"rate": [],

                           "absolute": {}}}

        abs_limits = {}

        rate_limits = []

        output = self.view_builder.build(rate_limits, abs_limits)

        self.assertDictMatch(output, expected_limits)

**** CubicPower OpenStack Study ****

class LimitsXMLSerializationTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def test_xml_declaration(self):

        serializer = limits.LimitsTemplate()

        fixture = {"limits": {

                   "rate": [],

                   "absolute": {}}}

        output = serializer.serialize(fixture)

        has_dec = output.startswith("")

        self.assertTrue(has_dec)

**** CubicPower OpenStack Study ****

    def test_index(self):

        tdate = "2011-12-15T22:42:45Z"

        serializer = limits.LimitsTemplate()

        fixture = {"limits": {"rate": [{"uri": "*",

                                        "regex": ".*",

                                        "limit": [{"value": 10,

                                                   "verb": "POST",

                                                   "remaining": 2,

                                                   "unit": "MINUTE",

                                                   "next-available": tdate}]},

                                       {"uri": "*/servers",

                                        "regex": "^/servers",

                                        "limit": [{"value": 50,

                                                   "verb": "POST",

                                                   "remaining": 10,

                                                   "unit": "DAY",

                                                   "next-available": tdate}]}],

                              "absolute": {"maxServerMeta": 1,

                                           "maxImageMeta": 1,

                                           "maxPersonality": 5,

                                           "maxPersonalitySize": 10240}}}

        output = serializer.serialize(fixture)

        root = etree.XML(output)

        xmlutil.validate_schema(root, 'limits')

        #verify absolute limits

        absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)

        self.assertEqual(len(absolutes), 4)

        for limit in absolutes:

            name = limit.get('name')

            value = limit.get('value')

            self.assertEqual(value, str(fixture['limits']['absolute'][name]))

        #verify rate limits

        rates = root.xpath('ns:rates/ns:rate', namespaces=NS)

        self.assertEqual(len(rates), 2)

        for i, rate in enumerate(rates):

            for key in ['uri', 'regex']:

                self.assertEqual(rate.get(key),

                                 str(fixture['limits']['rate'][i][key]))

            rate_limits = rate.xpath('ns:limit', namespaces=NS)

            self.assertEqual(len(rate_limits), 1)

            for j, limit in enumerate(rate_limits):

                for key in ['verb', 'value', 'remaining', 'unit',

                            'next-available']:

                    self.assertEqual(

                        limit.get(key),

                        str(fixture['limits']['rate'][i]['limit'][j][key]))

**** CubicPower OpenStack Study ****

    def test_index_no_limits(self):

        serializer = limits.LimitsTemplate()

        fixture = {"limits": {

                   "rate": [],

                   "absolute": {}}}

        output = serializer.serialize(fixture)

        root = etree.XML(output)

        xmlutil.validate_schema(root, 'limits')

        #verify absolute limits

        absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)

        self.assertEqual(len(absolutes), 0)

        #verify rate limits

        rates = root.xpath('ns:rates/ns:rate', namespaces=NS)

        self.assertEqual(len(rates), 0)