**** CubicPower OpenStack Study ****
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
"""
Tests dealing with HTTP rate-limiting.
"""
import httplib
import six
from xml.dom import minidom
from lxml import etree
import webob
from cinder.api.v1 import limits
from cinder.api import views
from cinder.api import xmlutil
import cinder.context
from cinder.openstack.common import jsonutils
from cinder import test
TEST_LIMITS = [
    limits.Limit("GET", "/delayed", "^/delayed", 1, limits.PER_MINUTE),
    limits.Limit("POST", "*", ".*", 7, limits.PER_MINUTE),
    limits.Limit("POST", "/volumes", "^/volumes", 3, limits.PER_MINUTE),
    limits.Limit("PUT", "*", "", 10, limits.PER_MINUTE),
    limits.Limit("PUT", "/volumes", "^/volumes", 5, limits.PER_MINUTE),
]
NS = {
    'atom': 'http://www.w3.org/2005/Atom',
    'ns': 'http://docs.openstack.org/common/api/v1.0'
}
**** CubicPower OpenStack Study ****
class BaseLimitTestSuite(test.TestCase):
    """Base test suite which provides relevant stubs and time abstraction."""
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(BaseLimitTestSuite, self).setUp()
        self.time = 0.0
        self.stubs.Set(limits.Limit, "_get_time", self._get_time)
        self.absolute_limits = {}
        def stub_get_project_quotas(context, project_id, usages=True):
            return dict((k, dict(limit=v))
                        for k, v in self.absolute_limits.items())
        self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",
                       stub_get_project_quotas)
**** CubicPower OpenStack Study ****
        def stub_get_project_quotas(context, project_id, usages=True):
            return dict((k, dict(limit=v))
                        for k, v in self.absolute_limits.items())
        self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",
                       stub_get_project_quotas)
**** CubicPower OpenStack Study ****
    def _get_time(self):
        """Return the "time" according to this test suite."""
        return self.time
**** CubicPower OpenStack Study ****
class LimitsControllerTest(BaseLimitTestSuite):
    """Tests for `limits.LimitsController` class."""
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        """Run before each test."""
        super(LimitsControllerTest, self).setUp()
        self.controller = limits.create_resource()
**** CubicPower OpenStack Study ****
    def _get_index_request(self, accept_header="application/json"):
        """Helper to set routing arguments."""
        request = webob.Request.blank("/")
        request.accept = accept_header
        request.environ["wsgiorg.routing_args"] = (None, {
            "action": "index",
            "controller": "",
        })
        context = cinder.context.RequestContext('testuser', 'testproject')
        request.environ["cinder.context"] = context
        return request
**** CubicPower OpenStack Study ****
    def _populate_limits(self, request):
        """Put limit info into a request."""
        _limits = [
            limits.Limit("GET", "*", ".*", 10, 60).display(),
            limits.Limit("POST", "*", ".*", 5, 60 * 60).display(),
            limits.Limit("GET", "changes-since*", "changes-since",
                         5, 60).display(),
        ]
        request.environ["cinder.limits"] = _limits
        return request
**** CubicPower OpenStack Study ****
    def test_empty_index_json(self):
        """Test getting empty limit details in JSON."""
        request = self._get_index_request()
        response = request.get_response(self.controller)
        expected = {
            "limits": {
                "rate": [],
                "absolute": {},
            },
        }
        body = jsonutils.loads(response.body)
        self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
    def test_index_json(self):
        """Test getting limit details in JSON."""
        request = self._get_index_request()
        request = self._populate_limits(request)
        self.absolute_limits = {
            'gigabytes': 512,
            'volumes': 5,
        }
        response = request.get_response(self.controller)
        expected = {
            "limits": {
                "rate": [
                    {
                        "regex": ".*",
                        "uri": "*",
                        "limit": [
                            {
                                "verb": "GET",
                                "next-available": "1970-01-01T00:00:00Z",
                                "unit": "MINUTE",
                                "value": 10,
                                "remaining": 10,
                            },
                            {
                                "verb": "POST",
                                "next-available": "1970-01-01T00:00:00Z",
                                "unit": "HOUR",
                                "value": 5,
                                "remaining": 5,
                            },
                        ],
                    },
                    {
                        "regex": "changes-since",
                        "uri": "changes-since*",
                        "limit": [
                            {
                                "verb": "GET",
                                "next-available": "1970-01-01T00:00:00Z",
                                "unit": "MINUTE",
                                "value": 5,
                                "remaining": 5,
                            },
                        ],
                    },
                ],
                "absolute": {"maxTotalVolumeGigabytes": 512,
                             "maxTotalVolumes": 5, },
            },
        }
        body = jsonutils.loads(response.body)
        self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
    def _populate_limits_diff_regex(self, request):
        """Put limit info into a request."""
        _limits = [
            limits.Limit("GET", "*", ".*", 10, 60).display(),
            limits.Limit("GET", "*", "*.*", 10, 60).display(),
        ]
        request.environ["cinder.limits"] = _limits
        return request
**** CubicPower OpenStack Study ****
    def test_index_diff_regex(self):
        """Test getting limit details in JSON."""
        request = self._get_index_request()
        request = self._populate_limits_diff_regex(request)
        response = request.get_response(self.controller)
        expected = {
            "limits": {
                "rate": [
                    {
                        "regex": ".*",
                        "uri": "*",
                        "limit": [
                            {
                                "verb": "GET",
                                "next-available": "1970-01-01T00:00:00Z",
                                "unit": "MINUTE",
                                "value": 10,
                                "remaining": 10,
                            },
                        ],
                    },
                    {
                        "regex": "*.*",
                        "uri": "*",
                        "limit": [
                            {
                                "verb": "GET",
                                "next-available": "1970-01-01T00:00:00Z",
                                "unit": "MINUTE",
                                "value": 10,
                                "remaining": 10,
                            },
                        ],
                    },
                ],
                "absolute": {},
            },
        }
        body = jsonutils.loads(response.body)
        self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
    def _test_index_absolute_limits_json(self, expected):
        request = self._get_index_request()
        response = request.get_response(self.controller)
        body = jsonutils.loads(response.body)
        self.assertEqual(expected, body['limits']['absolute'])
**** CubicPower OpenStack Study ****
    def test_index_ignores_extra_absolute_limits_json(self):
        self.absolute_limits = {'unknown_limit': 9001}
        self._test_index_absolute_limits_json({})
**** CubicPower OpenStack Study ****
class TestLimiter(limits.Limiter):
    pass
**** CubicPower OpenStack Study ****
class LimitMiddlewareTest(BaseLimitTestSuite):
    """Tests for the `limits.RateLimitingMiddleware` class."""
    @webob.dec.wsgify
    
**** CubicPower OpenStack Study ****
    def _empty_app(self, request):
        """Do-nothing WSGI app."""
        pass
**** CubicPower OpenStack Study ****
    def setUp(self):
        """Prepare middleware for use through fake WSGI app."""
        super(LimitMiddlewareTest, self).setUp()
        _limits = '(GET, *, .*, 1, MINUTE)'
        self.app = limits.RateLimitingMiddleware(self._empty_app, _limits,
                                                 "%s.TestLimiter" %
                                                 self.__class__.__module__)
**** CubicPower OpenStack Study ****
    def test_limit_class(self):
        """Test that middleware selected correct limiter class."""
        assert isinstance(self.app._limiter, TestLimiter)
**** CubicPower OpenStack Study ****
    def test_good_request(self):
        """Test successful GET request through middleware."""
        request = webob.Request.blank("/")
        response = request.get_response(self.app)
        self.assertEqual(200, response.status_int)
**** CubicPower OpenStack Study ****
    def test_limited_request_json(self):
        """Test a rate-limited (413) GET request through middleware."""
        request = webob.Request.blank("/")
        response = request.get_response(self.app)
        self.assertEqual(200, response.status_int)
        request = webob.Request.blank("/")
        response = request.get_response(self.app)
        self.assertEqual(response.status_int, 413)
        self.assertIn('Retry-After', response.headers)
        retry_after = int(response.headers['Retry-After'])
        self.assertAlmostEqual(retry_after, 60, 1)
        body = jsonutils.loads(response.body)
        expected = "Only 1 GET request(s) can be made to * every minute."
        value = body["overLimitFault"]["details"].strip()
        self.assertEqual(value, expected)
**** CubicPower OpenStack Study ****
    def test_limited_request_xml(self):
        """Test a rate-limited (413) response as XML."""
        request = webob.Request.blank("/")
        response = request.get_response(self.app)
        self.assertEqual(200, response.status_int)
        request = webob.Request.blank("/")
        request.accept = "application/xml"
        response = request.get_response(self.app)
        self.assertEqual(response.status_int, 413)
        root = minidom.parseString(response.body).childNodes[0]
        expected = "Only 1 GET request(s) can be made to * every minute."
        details = root.getElementsByTagName("details")
        self.assertEqual(details.length, 1)
        value = details.item(0).firstChild.data.strip()
        self.assertEqual(value, expected)
**** CubicPower OpenStack Study ****
class LimitTest(BaseLimitTestSuite):
    """Tests for the `limits.Limit` class."""
    
**** CubicPower OpenStack Study ****
    def test_GET_no_delay(self):
        """Test a limit handles 1 GET per second."""
        limit = limits.Limit("GET", "*", ".*", 1, 1)
        delay = limit("GET", "/anything")
        self.assertIsNone(delay)
        self.assertEqual(0, limit.next_request)
        self.assertEqual(0, limit.last_request)
**** CubicPower OpenStack Study ****
    def test_GET_delay(self):
        """Test two calls to 1 GET per second limit."""
        limit = limits.Limit("GET", "*", ".*", 1, 1)
        delay = limit("GET", "/anything")
        self.assertIsNone(delay)
        delay = limit("GET", "/anything")
        self.assertEqual(1, delay)
        self.assertEqual(1, limit.next_request)
        self.assertEqual(0, limit.last_request)
        self.time += 4
        delay = limit("GET", "/anything")
        self.assertIsNone(delay)
        self.assertEqual(4, limit.next_request)
        self.assertEqual(4, limit.last_request)
**** CubicPower OpenStack Study ****
class ParseLimitsTest(BaseLimitTestSuite):
    """Tests for the 
**** CubicPower OpenStack Study ****
    def test_invalid(self):
        """Test that parse_limits() handles invalid input correctly."""
        self.assertRaises(ValueError, limits.Limiter.parse_limits,
                          ';;;;;')
**** CubicPower OpenStack Study ****
    def test_bad_rule(self):
        """Test that parse_limits() handles bad rules correctly."""
        self.assertRaises(ValueError, limits.Limiter.parse_limits,
                          'GET, *, .*, 20, minute')
**** CubicPower OpenStack Study ****
    def test_missing_arg(self):
        """Test that parse_limits() handles missing args correctly."""
        self.assertRaises(ValueError, limits.Limiter.parse_limits,
                          '(GET, *, .*, 20)')
**** CubicPower OpenStack Study ****
    def test_bad_value(self):
        """Test that parse_limits() handles bad values correctly."""
        self.assertRaises(ValueError, limits.Limiter.parse_limits,
                          '(GET, *, .*, foo, minute)')
**** CubicPower OpenStack Study ****
    def test_bad_unit(self):
        """Test that parse_limits() handles bad units correctly."""
        self.assertRaises(ValueError, limits.Limiter.parse_limits,
                          '(GET, *, .*, 20, lightyears)')
**** CubicPower OpenStack Study ****
    def test_multiple_rules(self):
        """Test that parse_limits() handles multiple rules correctly."""
        try:
            l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'
                                            '(PUT, /foo*, /foo.*, 10, hour);'
                                            '(POST, /bar*, /bar.*, 5, second);'
                                            '(Say, /derp*, /derp.*, 1, day)')
        except ValueError as e:
            assert False, e
        # Make sure the number of returned limits are correct
        self.assertEqual(len(l), 4)
        # Check all the verbs...
        expected = ['GET', 'PUT', 'POST', 'SAY']
        self.assertEqual([t.verb for t in l], expected)
        # ...the URIs...
        expected = ['*', '/foo*', '/bar*', '/derp*']
        self.assertEqual([t.uri for t in l], expected)
        # ...the regexes...
        expected = ['.*', '/foo.*', '/bar.*', '/derp.*']
        self.assertEqual([t.regex for t in l], expected)
        # ...the values...
        expected = [20, 10, 5, 1]
        self.assertEqual([t.value for t in l], expected)
        # ...and the units...
        expected = [limits.PER_MINUTE, limits.PER_HOUR,
                    limits.PER_SECOND, limits.PER_DAY]
        self.assertEqual([t.unit for t in l], expected)
**** CubicPower OpenStack Study ****
class LimiterTest(BaseLimitTestSuite):
    """Tests for the in-memory `limits.Limiter` class."""
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        """Run before each test."""
        super(LimiterTest, self).setUp()
        userlimits = {'limits.user3': '',
                      'limits.user0': '(get, *, .*, 4, minute);'
                                      '(put, *, .*, 2, minute)'}
        self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)
**** CubicPower OpenStack Study ****
    def _check(self, num, verb, url, username=None):
        """Check and yield results from checks."""
        for x in xrange(num):
            yield self.limiter.check_for_delay(verb, url, username)[0]
**** CubicPower OpenStack Study ****
    def _check_sum(self, num, verb, url, username=None):
        """Check and sum results from checks."""
        results = self._check(num, verb, url, username)
        return sum(item for item in results if item)
**** CubicPower OpenStack Study ****
    def test_no_delay_GET(self):
        """no delay on a single call for a limit verb we didn"t set."""
        delay = self.limiter.check_for_delay("GET", "/anything")
        self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
    def test_no_delay_PUT(self):
        """no delay on a single call for a known limit."""
        delay = self.limiter.check_for_delay("PUT", "/anything")
        self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
    def test_delay_PUT(self):
        """test delay on 11th put request.
        the 11th PUT will result in a delay of 6.0 seconds until
        the next request will be granced.
        """
        expected = [None] * 10 + [6.0]
        results = list(self._check(11, "PUT", "/anything"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
    def test_delay_POST(self):
        """test delay of 8th post request.
        Ensure that the 8th POST will result in a delay of 6.0 seconds
        until the next request will be granced.
        """
        expected = [None] * 7
        results = list(self._check(7, "POST", "/anything"))
        self.assertEqual(expected, results)
        expected = 60.0 / 7.0
        results = self._check_sum(1, "POST", "/anything")
        self.assertAlmostEqual(expected, results, 8)
**** CubicPower OpenStack Study ****
    def test_delay_GET(self):
        """Ensure the 11th GET will result in NO delay."""
        expected = [None] * 11
        results = list(self._check(11, "GET", "/anything"))
        self.assertEqual(expected, results)
        expected = [None] * 4 + [15.0]
        results = list(self._check(5, "GET", "/foo", "user0"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
    def test_delay_PUT_volumes(self):
        """Test limit of PUT on /volumes.
        Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere is
        still OK after 5 requests...
        but then after 11 total requests, PUT limiting kicks in.
        """
        # First 6 requests on PUT /volumes
        expected = [None] * 5 + [12.0]
        results = list(self._check(6, "PUT", "/volumes"))
        self.assertEqual(expected, results)
        # Next 5 request on PUT /anything
        expected = [None] * 4 + [6.0]
        results = list(self._check(5, "PUT", "/anything"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
    def test_delay_PUT_wait(self):
        """Test limit on PUT is lifted.
        Ensure after hitting the limit and then waiting for the correct
        amount of time, the limit will be lifted.
        """
        expected = [None] * 10 + [6.0]
        results = list(self._check(11, "PUT", "/anything"))
        self.assertEqual(expected, results)
        # Advance time
        self.time += 6.0
        expected = [None, 6.0]
        results = list(self._check(2, "PUT", "/anything"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
    def test_multiple_delays(self):
        """Ensure multiple requests still get a delay."""
        expected = [None] * 10 + [6.0] * 10
        results = list(self._check(20, "PUT", "/anything"))
        self.assertEqual(expected, results)
        self.time += 1.0
        expected = [5.0] * 10
        results = list(self._check(10, "PUT", "/anything"))
        self.assertEqual(expected, results)
        expected = [None] * 2 + [30.0] * 8
        results = list(self._check(10, "PUT", "/anything", "user0"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
    def test_user_limit(self):
        """Test user-specific limits."""
        self.assertEqual(self.limiter.levels['user3'], [])
        self.assertEqual(len(self.limiter.levels['user0']), 2)
**** CubicPower OpenStack Study ****
    def test_multiple_users(self):
        """Tests involving multiple users."""
        # User0
        expected = [None] * 2 + [30.0] * 8
        results = list(self._check(10, "PUT", "/anything", "user0"))
        self.assertEqual(expected, results)
        # User1
        expected = [None] * 10 + [6.0] * 10
        results = list(self._check(20, "PUT", "/anything", "user1"))
        self.assertEqual(expected, results)
        # User2
        expected = [None] * 10 + [6.0] * 5
        results = list(self._check(15, "PUT", "/anything", "user2"))
        self.assertEqual(expected, results)
        # User3
        expected = [None] * 20
        results = list(self._check(20, "PUT", "/anything", "user3"))
        self.assertEqual(expected, results)
        self.time += 1.0
        # User1 again
        expected = [5.0] * 10
        results = list(self._check(10, "PUT", "/anything", "user1"))
        self.assertEqual(expected, results)
        self.time += 1.0
        # User1 again
        expected = [4.0] * 5
        results = list(self._check(5, "PUT", "/anything", "user2"))
        self.assertEqual(expected, results)
        # User0 again
        expected = [28.0]
        results = list(self._check(1, "PUT", "/anything", "user0"))
        self.assertEqual(expected, results)
        self.time += 28.0
        expected = [None, 30.0]
        results = list(self._check(2, "PUT", "/anything", "user0"))
        self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
class WsgiLimiterTest(BaseLimitTestSuite):
    """Tests for `limits.WsgiLimiter` class."""
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        """Run before each test."""
        super(WsgiLimiterTest, self).setUp()
        self.app = limits.WsgiLimiter(TEST_LIMITS)
**** CubicPower OpenStack Study ****
    def _request_data(self, verb, path):
        """Get data describing a limit request verb/path."""
        return jsonutils.dumps({"verb": verb, "path": path})
**** CubicPower OpenStack Study ****
    def _request(self, verb, url, username=None):
        """Assert that POSTing to given url triggers given action.
        Ensure POSTing to the given url causes the given username
        to perform the given action.
        Make the internal rate limiter return delay and make sure that the
        WSGI app returns the correct response.
        """
        if username:
            request = webob.Request.blank("/%s" % username)
        else:
            request = webob.Request.blank("/")
        request.method = "POST"
        request.body = self._request_data(verb, url)
        response = request.get_response(self.app)
        if "X-Wait-Seconds" in response.headers:
            self.assertEqual(response.status_int, 403)
            return response.headers["X-Wait-Seconds"]
        self.assertEqual(response.status_int, 204)
**** CubicPower OpenStack Study ****
    def test_invalid_methods(self):
        """Only POSTs should work."""
        requests = []
        for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
            request = webob.Request.blank("/", method=method)
            response = request.get_response(self.app)
            self.assertEqual(response.status_int, 405)
**** CubicPower OpenStack Study ****
    def test_good_url(self):
        delay = self._request("GET", "/something")
        self.assertIsNone(delay)
**** CubicPower OpenStack Study ****
    def test_escaping(self):
        delay = self._request("GET", "/something/jump%20up")
        self.assertIsNone(delay)
**** CubicPower OpenStack Study ****
    def test_response_to_delays(self):
        delay = self._request("GET", "/delayed")
        self.assertIsNone(delay)
        delay = self._request("GET", "/delayed")
        self.assertEqual(delay, '60.00')
**** CubicPower OpenStack Study ****
    def test_response_to_delays_usernames(self):
        delay = self._request("GET", "/delayed", "user1")
        self.assertIsNone(delay)
        delay = self._request("GET", "/delayed", "user2")
        self.assertIsNone(delay)
        delay = self._request("GET", "/delayed", "user1")
        self.assertEqual(delay, '60.00')
        delay = self._request("GET", "/delayed", "user2")
        self.assertEqual(delay, '60.00')
**** CubicPower OpenStack Study ****
class FakeHttplibSocket(object):
    """Fake `httplib.HTTPResponse` replacement."""
    
**** CubicPower OpenStack Study ****
    def __init__(self, response_string):
        """Initialize new `FakeHttplibSocket`."""
        self._buffer = six.StringIO(response_string)
**** CubicPower OpenStack Study ****
    def makefile(self, _mode, _other):
        """Returns the socket's internal buffer."""
        return self._buffer
**** CubicPower OpenStack Study ****
class FakeHttplibConnection(object):
    """Fake `httplib.HTTPConnection`."""
    
**** CubicPower OpenStack Study ****
    def __init__(self, app, host):
        """Initialize `FakeHttplibConnection`."""
        self.app = app
        self.host = host
**** CubicPower OpenStack Study ****
    def request(self, method, path, body="", headers=None):
        """Fake method for request.
        Requests made via this connection actually get translated and
        routed into our WSGI app, we then wait for the response and turn
        it back into an `httplib.HTTPResponse`.
        """
        if not headers:
            headers = {}
        req = webob.Request.blank(path)
        req.method = method
        req.headers = headers
        req.host = self.host
        req.body = body
        resp = str(req.get_response(self.app))
        resp = "HTTP/1.0 %s" % resp
        sock = FakeHttplibSocket(resp)
        self.http_response = httplib.HTTPResponse(sock)
        self.http_response.begin()
**** CubicPower OpenStack Study ****
    def getresponse(self):
        """Return our generated response from the request."""
        return self.http_response
def wire_HTTPConnection_to_WSGI(host, app):
    """Monkeypatches HTTPConnection so that if you try to connect to host, you
    are instead routed straight to the given WSGI app.
    After calling this method, when any code calls
    httplib.HTTPConnection(host)
    the connection object will be a fake.  Its requests will be sent directly
    to the given WSGI app rather than through a socket.
    Code connecting to hosts other than host will not be affected.
    This method may be called multiple times to map different hosts to
    different apps.
    This method returns the original HTTPConnection object, so that the caller
    can restore the default HTTPConnection interface (for all hosts).
    """
    class HTTPConnectionDecorator(object):
        """Wraps the real HTTPConnection class so that when you instantiate
        the class you might instead get a fake instance.
        """
        def __init__(self, wrapped):
            self.wrapped = wrapped
        def __call__(self, connection_host, *args, **kwargs):
            if connection_host == host:
                return FakeHttplibConnection(app, host)
            else:
                return self.wrapped(connection_host, *args, **kwargs)
    oldHTTPConnection = httplib.HTTPConnection
    httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
    return oldHTTPConnection
**** CubicPower OpenStack Study ****
        def __init__(self, wrapped):
            self.wrapped = wrapped
**** CubicPower OpenStack Study ****
        def __call__(self, connection_host, *args, **kwargs):
            if connection_host == host:
                return FakeHttplibConnection(app, host)
            else:
                return self.wrapped(connection_host, *args, **kwargs)
    oldHTTPConnection = httplib.HTTPConnection
    httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
    return oldHTTPConnection
**** CubicPower OpenStack Study ****
class WsgiLimiterProxyTest(BaseLimitTestSuite):
    """Tests for the `limits.WsgiLimiterProxy` class."""
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        """setUp for test suite.
        Do some nifty HTTP/WSGI magic which allows for WSGI to be called
        directly by something like the `httplib` library.
        """
        super(WsgiLimiterProxyTest, self).setUp()
        self.app = limits.WsgiLimiter(TEST_LIMITS)
        self.oldHTTPConnection = (
            wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))
        self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
**** CubicPower OpenStack Study ****
    def test_200(self):
        """Successful request test."""
        delay = self.proxy.check_for_delay("GET", "/anything")
        self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
    def test_403(self):
        """Forbidden request test."""
        delay = self.proxy.check_for_delay("GET", "/delayed")
        self.assertEqual(delay, (None, None))
        delay, error = self.proxy.check_for_delay("GET", "/delayed")
        error = error.strip()
        expected = ("60.00", "403 Forbidden\n\nOnly 1 GET request(s) can be "
                    "made to /delayed every minute.")
        self.assertEqual((delay, error), expected)
**** CubicPower OpenStack Study ****
    def tearDown(self):
        # restore original HTTPConnection object
        httplib.HTTPConnection = self.oldHTTPConnection
        super(WsgiLimiterProxyTest, self).tearDown()
**** CubicPower OpenStack Study ****
class LimitsViewBuilderTest(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def setUp(self):
        super(LimitsViewBuilderTest, self).setUp()
        self.view_builder = views.limits.ViewBuilder()
        self.rate_limits = [{"URI": "*",
                             "regex": ".*",
                             "value": 10,
                             "verb": "POST",
                             "remaining": 2,
                             "unit": "MINUTE",
                             "resetTime": 1311272226},
                            {"URI": "*/volumes",
                             "regex": "^/volumes",
                             "value": 50,
                             "verb": "POST",
                             "remaining": 10,
                             "unit": "DAY",
                             "resetTime": 1311272226}]
        self.absolute_limits = {"metadata_items": 1,
                                "injected_files": 5,
                                "injected_file_content_bytes": 5}
**** CubicPower OpenStack Study ****
    def test_build_limits(self):
        tdate = "2011-07-21T18:17:06Z"
        expected_limits = \
            {"limits": {"rate": [{"uri": "*",
                                  "regex": ".*",
                                  "limit": [{"value": 10,
                                             "verb": "POST",
                                             "remaining": 2,
                                             "unit": "MINUTE",
                                             "next-available": tdate}]},
                                 {"uri": "*/volumes",
                                  "regex": "^/volumes",
                                  "limit": [{"value": 50,
                                             "verb": "POST",
                                             "remaining": 10,
                                             "unit": "DAY",
                                             "next-available": tdate}]}],
                        "absolute": {"maxServerMeta": 1,
                                     "maxImageMeta": 1,
                                     "maxPersonality": 5,
                                     "maxPersonalitySize": 5}}}
        output = self.view_builder.build(self.rate_limits,
                                         self.absolute_limits)
        self.assertDictMatch(output, expected_limits)
**** CubicPower OpenStack Study ****
    def test_build_limits_empty_limits(self):
        expected_limits = {"limits": {"rate": [],
                           "absolute": {}}}
        abs_limits = {}
        rate_limits = []
        output = self.view_builder.build(rate_limits, abs_limits)
        self.assertDictMatch(output, expected_limits)
**** CubicPower OpenStack Study ****
class LimitsXMLSerializationTest(test.TestCase):
    
**** CubicPower OpenStack Study ****
    def test_xml_declaration(self):
        serializer = limits.LimitsTemplate()
        fixture = {"limits": {
                   "rate": [],
                   "absolute": {}}}
        output = serializer.serialize(fixture)
        has_dec = output.startswith("")
        self.assertTrue(has_dec)
**** CubicPower OpenStack Study ****
    def test_index(self):
        serializer = limits.LimitsTemplate()
        fixture = {
            "limits": {
                "rate": [{
                    "uri": "*",
                    "regex": ".*",
                    "limit": [{
                        "value": 10,
                        "verb": "POST",
                        "remaining": 2,
                        "unit": "MINUTE",
                        "next-available": "2011-12-15T22:42:45Z"}]},
                    {"uri": "*/servers",
                     "regex": "^/servers",
                     "limit": [{
                         "value": 50,
                         "verb": "POST",
                         "remaining": 10,
                         "unit": "DAY",
                         "next-available": "2011-12-15T22:42:45Z"}]}],
                "absolute": {"maxServerMeta": 1,
                             "maxImageMeta": 1,
                             "maxPersonality": 5,
                             "maxPersonalitySize": 10240}}}
        output = serializer.serialize(fixture)
        root = etree.XML(output)
        xmlutil.validate_schema(root, 'limits')
        #verify absolute limits
        absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
        self.assertEqual(len(absolutes), 4)
        for limit in absolutes:
            name = limit.get('name')
            value = limit.get('value')
            self.assertEqual(value, str(fixture['limits']['absolute'][name]))
        #verify rate limits
        rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
        self.assertEqual(len(rates), 2)
        for i, rate in enumerate(rates):
            for key in ['uri', 'regex']:
                self.assertEqual(rate.get(key),
                                 str(fixture['limits']['rate'][i][key]))
            rate_limits = rate.xpath('ns:limit', namespaces=NS)
            self.assertEqual(len(rate_limits), 1)
            for j, limit in enumerate(rate_limits):
                for key in ['verb', 'value', 'remaining', 'unit',
                            'next-available']:
                    self.assertEqual(
                        limit.get(key),
                        str(fixture['limits']['rate'][i]['limit'][j][key]))
**** CubicPower OpenStack Study ****
    def test_index_no_limits(self):
        serializer = limits.LimitsTemplate()
        fixture = {"limits": {
                   "rate": [],
                   "absolute": {}}}
        output = serializer.serialize(fixture)
        root = etree.XML(output)
        xmlutil.validate_schema(root, 'limits')
        #verify absolute limits
        absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
        self.assertEqual(len(absolutes), 0)
        #verify rate limits
        rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
        self.assertEqual(len(rates), 0)