**** CubicPower OpenStack Study ****
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests dealing with HTTP rate-limiting.
"""
import httplib
from lxml import etree
import six
import webob
from xml.dom import minidom
from cinder.api.v2 import limits
from cinder.api import views
from cinder.api import xmlutil
import cinder.context
from cinder.openstack.common import jsonutils
from cinder import test
TEST_LIMITS = [
limits.Limit("GET", "/delayed", "^/delayed", 1, limits.PER_MINUTE),
limits.Limit("POST", "*", ".*", 7, limits.PER_MINUTE),
limits.Limit("POST", "/volumes", "^/volumes", 3, limits.PER_MINUTE),
limits.Limit("PUT", "*", "", 10, limits.PER_MINUTE),
limits.Limit("PUT", "/volumes", "^/volumes", 5, limits.PER_MINUTE),
]
NS = {
'atom': 'http://www.w3.org/2005/Atom',
'ns': 'http://docs.openstack.org/common/api/v1.0',
}
**** CubicPower OpenStack Study ****
class BaseLimitTestSuite(test.TestCase):
"""Base test suite which provides relevant stubs and time abstraction."""
**** CubicPower OpenStack Study ****
def setUp(self):
super(BaseLimitTestSuite, self).setUp()
self.time = 0.0
self.stubs.Set(limits.Limit, "_get_time", self._get_time)
self.absolute_limits = {}
def stub_get_project_quotas(context, project_id, usages=True):
return dict((k, dict(limit=v))
for k, v in self.absolute_limits.items())
self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",
stub_get_project_quotas)
**** CubicPower OpenStack Study ****
def stub_get_project_quotas(context, project_id, usages=True):
return dict((k, dict(limit=v))
for k, v in self.absolute_limits.items())
self.stubs.Set(cinder.quota.QUOTAS, "get_project_quotas",
stub_get_project_quotas)
**** CubicPower OpenStack Study ****
def _get_time(self):
"""Return the "time" according to this test suite."""
return self.time
**** CubicPower OpenStack Study ****
class LimitsControllerTest(BaseLimitTestSuite):
"""Tests for `limits.LimitsController` class."""
**** CubicPower OpenStack Study ****
def setUp(self):
"""Run before each test."""
super(LimitsControllerTest, self).setUp()
self.controller = limits.create_resource()
**** CubicPower OpenStack Study ****
def _get_index_request(self, accept_header="application/json"):
"""Helper to set routing arguments."""
request = webob.Request.blank("/")
request.accept = accept_header
request.environ["wsgiorg.routing_args"] = (None, {
"action": "index",
"controller": "",
})
context = cinder.context.RequestContext('testuser', 'testproject')
request.environ["cinder.context"] = context
return request
**** CubicPower OpenStack Study ****
def _populate_limits(self, request):
"""Put limit info into a request."""
_limits = [
limits.Limit("GET", "*", ".*", 10, 60).display(),
limits.Limit("POST", "*", ".*", 5, 60 * 60).display(),
limits.Limit("GET", "changes-since*", "changes-since",
5, 60).display(),
]
request.environ["cinder.limits"] = _limits
return request
**** CubicPower OpenStack Study ****
def test_empty_index_json(self):
"""Test getting empty limit details in JSON."""
request = self._get_index_request()
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [],
"absolute": {},
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
def test_index_json(self):
"""Test getting limit details in JSON."""
request = self._get_index_request()
request = self._populate_limits(request)
self.absolute_limits = {
'gigabytes': 512,
'volumes': 5,
}
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [
{
"regex": ".*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00Z",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
{
"verb": "POST",
"next-available": "1970-01-01T00:00:00Z",
"unit": "HOUR",
"value": 5,
"remaining": 5,
},
],
},
{
"regex": "changes-since",
"uri": "changes-since*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00Z",
"unit": "MINUTE",
"value": 5,
"remaining": 5,
},
],
},
],
"absolute": {"maxTotalVolumeGigabytes": 512,
"maxTotalVolumes": 5, },
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
def _populate_limits_diff_regex(self, request):
"""Put limit info into a request."""
_limits = [
limits.Limit("GET", "*", ".*", 10, 60).display(),
limits.Limit("GET", "*", "*.*", 10, 60).display(),
]
request.environ["cinder.limits"] = _limits
return request
**** CubicPower OpenStack Study ****
def test_index_diff_regex(self):
"""Test getting limit details in JSON."""
request = self._get_index_request()
request = self._populate_limits_diff_regex(request)
response = request.get_response(self.controller)
expected = {
"limits": {
"rate": [
{
"regex": ".*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00Z",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
],
},
{
"regex": "*.*",
"uri": "*",
"limit": [
{
"verb": "GET",
"next-available": "1970-01-01T00:00:00Z",
"unit": "MINUTE",
"value": 10,
"remaining": 10,
},
],
},
],
"absolute": {},
},
}
body = jsonutils.loads(response.body)
self.assertEqual(expected, body)
**** CubicPower OpenStack Study ****
def _test_index_absolute_limits_json(self, expected):
request = self._get_index_request()
response = request.get_response(self.controller)
body = jsonutils.loads(response.body)
self.assertEqual(expected, body['limits']['absolute'])
**** CubicPower OpenStack Study ****
def test_index_ignores_extra_absolute_limits_json(self):
self.absolute_limits = {'unknown_limit': 9001}
self._test_index_absolute_limits_json({})
**** CubicPower OpenStack Study ****
class TestLimiter(limits.Limiter):
pass
**** CubicPower OpenStack Study ****
class LimitMiddlewareTest(BaseLimitTestSuite):
"""Tests for the `limits.RateLimitingMiddleware` class."""
@webob.dec.wsgify
**** CubicPower OpenStack Study ****
def _empty_app(self, request):
"""Do-nothing WSGI app."""
pass
**** CubicPower OpenStack Study ****
def setUp(self):
"""Prepare middleware for use through fake WSGI app."""
super(LimitMiddlewareTest, self).setUp()
_limits = '(GET, *, .*, 1, MINUTE)'
self.app = limits.RateLimitingMiddleware(self._empty_app, _limits,
"%s.TestLimiter" %
self.__class__.__module__)
**** CubicPower OpenStack Study ****
def test_limit_class(self):
"""Test that middleware selected correct limiter class."""
assert isinstance(self.app._limiter, TestLimiter)
**** CubicPower OpenStack Study ****
def test_good_request(self):
"""Test successful GET request through middleware."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
**** CubicPower OpenStack Study ****
def test_limited_request_json(self):
"""Test a rate-limited (413) GET request through middleware."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(response.status_int, 413)
self.assertIn('Retry-After', response.headers)
retry_after = int(response.headers['Retry-After'])
self.assertAlmostEqual(retry_after, 60, 1)
body = jsonutils.loads(response.body)
expected = "Only 1 GET request(s) can be made to * every minute."
value = body["overLimitFault"]["details"].strip()
self.assertEqual(value, expected)
**** CubicPower OpenStack Study ****
def test_limited_request_xml(self):
"""Test a rate-limited (413) response as XML."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
request = webob.Request.blank("/")
request.accept = "application/xml"
response = request.get_response(self.app)
self.assertEqual(response.status_int, 413)
root = minidom.parseString(response.body).childNodes[0]
expected = "Only 1 GET request(s) can be made to * every minute."
details = root.getElementsByTagName("details")
self.assertEqual(details.length, 1)
value = details.item(0).firstChild.data.strip()
self.assertEqual(value, expected)
**** CubicPower OpenStack Study ****
class LimitTest(BaseLimitTestSuite):
"""Tests for the `limits.Limit` class."""
**** CubicPower OpenStack Study ****
def test_GET_no_delay(self):
"""Test a limit handles 1 GET per second."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertIsNone(delay)
self.assertEqual(0, limit.next_request)
self.assertEqual(0, limit.last_request)
**** CubicPower OpenStack Study ****
def test_GET_delay(self):
"""Test two calls to 1 GET per second limit."""
limit = limits.Limit("GET", "*", ".*", 1, 1)
delay = limit("GET", "/anything")
self.assertIsNone(delay)
delay = limit("GET", "/anything")
self.assertEqual(1, delay)
self.assertEqual(1, limit.next_request)
self.assertEqual(0, limit.last_request)
self.time += 4
delay = limit("GET", "/anything")
self.assertIsNone(delay)
self.assertEqual(4, limit.next_request)
self.assertEqual(4, limit.last_request)
**** CubicPower OpenStack Study ****
class ParseLimitsTest(BaseLimitTestSuite):
"""Tests for the
**** CubicPower OpenStack Study ****
def test_invalid(self):
"""Test that parse_limits() handles invalid input correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
';;;;;')
**** CubicPower OpenStack Study ****
def test_bad_rule(self):
"""Test that parse_limits() handles bad rules correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'GET, *, .*, 20, minute')
**** CubicPower OpenStack Study ****
def test_missing_arg(self):
"""Test that parse_limits() handles missing args correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20)')
**** CubicPower OpenStack Study ****
def test_bad_value(self):
"""Test that parse_limits() handles bad values correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, foo, minute)')
**** CubicPower OpenStack Study ****
def test_bad_unit(self):
"""Test that parse_limits() handles bad units correctly."""
self.assertRaises(ValueError, limits.Limiter.parse_limits,
'(GET, *, .*, 20, lightyears)')
**** CubicPower OpenStack Study ****
def test_multiple_rules(self):
"""Test that parse_limits() handles multiple rules correctly."""
try:
l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'
'(PUT, /foo*, /foo.*, 10, hour);'
'(POST, /bar*, /bar.*, 5, second);'
'(Say, /derp*, /derp.*, 1, day)')
except ValueError as e:
assert False, str(e)
# Make sure the number of returned limits are correct
self.assertEqual(len(l), 4)
# Check all the verbs...
expected = ['GET', 'PUT', 'POST', 'SAY']
self.assertEqual([t.verb for t in l], expected)
# ...the URIs...
expected = ['*', '/foo*', '/bar*', '/derp*']
self.assertEqual([t.uri for t in l], expected)
# ...the regexes...
expected = ['.*', '/foo.*', '/bar.*', '/derp.*']
self.assertEqual([t.regex for t in l], expected)
# ...the values...
expected = [20, 10, 5, 1]
self.assertEqual([t.value for t in l], expected)
# ...and the units...
expected = [limits.PER_MINUTE, limits.PER_HOUR,
limits.PER_SECOND, limits.PER_DAY]
self.assertEqual([t.unit for t in l], expected)
**** CubicPower OpenStack Study ****
class LimiterTest(BaseLimitTestSuite):
"""Tests for the in-memory `limits.Limiter` class."""
**** CubicPower OpenStack Study ****
def setUp(self):
"""Run before each test."""
super(LimiterTest, self).setUp()
userlimits = {'limits.user3': '',
'limits.user0': '(get, *, .*, 4, minute);'
'(put, *, .*, 2, minute)'}
self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)
**** CubicPower OpenStack Study ****
def _check(self, num, verb, url, username=None):
"""Check and yield results from checks."""
for x in xrange(num):
yield self.limiter.check_for_delay(verb, url, username)[0]
**** CubicPower OpenStack Study ****
def _check_sum(self, num, verb, url, username=None):
"""Check and sum results from checks."""
results = self._check(num, verb, url, username)
return sum(item for item in results if item)
**** CubicPower OpenStack Study ****
def test_no_delay_GET(self):
"""Ensure no delay on a single call for a limit verb we didn't set."""
delay = self.limiter.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
def test_no_delay_PUT(self):
"""Ensure no delay on a single call for a known limit."""
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
def test_delay_PUT(self):
"""Test delay on 11th PUT request.
Ensure the 11th PUT will result in a delay of 6.0 seconds until
the next request will be granced.
"""
expected = [None] * 10 + [6.0]
results = list(self._check(11, "PUT", "/anything"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
def test_delay_POST(self):
"""Test delay on 8th POST request.
Ensure the 8th POST will result in a delay of 6.0 seconds until
the next request will be granced.
"""
expected = [None] * 7
results = list(self._check(7, "POST", "/anything"))
self.assertEqual(expected, results)
expected = 60.0 / 7.0
results = self._check_sum(1, "POST", "/anything")
self.assertAlmostEqual(expected, results, 8)
**** CubicPower OpenStack Study ****
def test_delay_GET(self):
"""Ensure the 11th GET will result in NO delay."""
expected = [None] * 11
results = list(self._check(11, "GET", "/anything"))
self.assertEqual(expected, results)
expected = [None] * 4 + [15.0]
results = list(self._check(5, "GET", "/foo", "user0"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
def test_delay_PUT_volumes(self):
"""Test delay on /volumes.
Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere
is still OK after 5 requests...but then after 11 total requests,
PUT limiting kicks in.
"""
# First 6 requests on PUT /volumes
expected = [None] * 5 + [12.0]
results = list(self._check(6, "PUT", "/volumes"))
self.assertEqual(expected, results)
# Next 5 request on PUT /anything
expected = [None] * 4 + [6.0]
results = list(self._check(5, "PUT", "/anything"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
def test_delay_PUT_wait(self):
"""Test limit is lifted again.
Ensure after hitting the limit and then waiting for
the correct amount of time, the limit will be lifted.
"""
expected = [None] * 10 + [6.0]
results = list(self._check(11, "PUT", "/anything"))
self.assertEqual(expected, results)
# Advance time
self.time += 6.0
expected = [None, 6.0]
results = list(self._check(2, "PUT", "/anything"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
def test_multiple_delays(self):
"""Ensure multiple requests still get a delay."""
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
self.time += 1.0
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything"))
self.assertEqual(expected, results)
expected = [None] * 2 + [30.0] * 8
results = list(self._check(10, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
def test_user_limit(self):
"""Test user-specific limits."""
self.assertEqual(self.limiter.levels['user3'], [])
self.assertEqual(len(self.limiter.levels['user0']), 2)
**** CubicPower OpenStack Study ****
def test_multiple_users(self):
"""Tests involving multiple users."""
# User0
expected = [None] * 2 + [30.0] * 8
results = list(self._check(10, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
# User1
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
# User2
expected = [None] * 10 + [6.0] * 5
results = list(self._check(15, "PUT", "/anything", "user2"))
self.assertEqual(expected, results)
# User3
expected = [None] * 20
results = list(self._check(20, "PUT", "/anything", "user3"))
self.assertEqual(expected, results)
self.time += 1.0
# User1 again
expected = [5.0] * 10
results = list(self._check(10, "PUT", "/anything", "user1"))
self.assertEqual(expected, results)
self.time += 1.0
# User1 again
expected = [4.0] * 5
results = list(self._check(5, "PUT", "/anything", "user2"))
self.assertEqual(expected, results)
# User0 again
expected = [28.0]
results = list(self._check(1, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
self.time += 28.0
expected = [None, 30.0]
results = list(self._check(2, "PUT", "/anything", "user0"))
self.assertEqual(expected, results)
**** CubicPower OpenStack Study ****
class WsgiLimiterTest(BaseLimitTestSuite):
"""Tests for `limits.WsgiLimiter` class."""
**** CubicPower OpenStack Study ****
def setUp(self):
"""Run before each test."""
super(WsgiLimiterTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
**** CubicPower OpenStack Study ****
def _request_data(self, verb, path):
"""Get data describing a limit request verb/path."""
return jsonutils.dumps({"verb": verb, "path": path})
**** CubicPower OpenStack Study ****
def _request(self, verb, url, username=None):
"""Make sure that POSTing to the given url causes the given username
to perform the given action. Make the internal rate limiter return
delay and make sure that the WSGI app returns the correct response.
"""
if username:
request = webob.Request.blank("/%s" % username)
else:
request = webob.Request.blank("/")
request.method = "POST"
request.body = self._request_data(verb, url)
response = request.get_response(self.app)
if "X-Wait-Seconds" in response.headers:
self.assertEqual(response.status_int, 403)
return response.headers["X-Wait-Seconds"]
self.assertEqual(response.status_int, 204)
**** CubicPower OpenStack Study ****
def test_invalid_methods(self):
"""Only POSTs should work."""
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
request = webob.Request.blank("/", method=method)
response = request.get_response(self.app)
self.assertEqual(response.status_int, 405)
**** CubicPower OpenStack Study ****
def test_good_url(self):
delay = self._request("GET", "/something")
self.assertIsNone(delay)
**** CubicPower OpenStack Study ****
def test_escaping(self):
delay = self._request("GET", "/something/jump%20up")
self.assertIsNone(delay)
**** CubicPower OpenStack Study ****
def test_response_to_delays(self):
delay = self._request("GET", "/delayed")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed")
self.assertEqual(delay, '60.00')
**** CubicPower OpenStack Study ****
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user2")
self.assertIsNone(delay)
delay = self._request("GET", "/delayed", "user1")
self.assertEqual(delay, '60.00')
delay = self._request("GET", "/delayed", "user2")
self.assertEqual(delay, '60.00')
**** CubicPower OpenStack Study ****
class FakeHttplibSocket(object):
"""Fake `httplib.HTTPResponse` replacement."""
**** CubicPower OpenStack Study ****
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
self._buffer = six.StringIO(response_string)
**** CubicPower OpenStack Study ****
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
return self._buffer
**** CubicPower OpenStack Study ****
class FakeHttplibConnection(object):
"""Fake `httplib.HTTPConnection`."""
**** CubicPower OpenStack Study ****
def __init__(self, app, host):
"""Initialize `FakeHttplibConnection`."""
self.app = app
self.host = host
**** CubicPower OpenStack Study ****
def request(self, method, path, body="", headers=None):
"""Fake request handler.
Requests made via this connection actually get translated and
routed into our WSGI app, we then wait for the response and turn
it back into an `httplib.HTTPResponse`.
"""
if not headers:
headers = {}
req = webob.Request.blank(path)
req.method = method
req.headers = headers
req.host = self.host
req.body = body
resp = str(req.get_response(self.app))
resp = "HTTP/1.0 %s" % resp
sock = FakeHttplibSocket(resp)
self.http_response = httplib.HTTPResponse(sock)
self.http_response.begin()
**** CubicPower OpenStack Study ****
def getresponse(self):
"""Return our generated response from the request."""
return self.http_response
def wire_HTTPConnection_to_WSGI(host, app):
"""Monkeypatches HTTPConnection so that if you try to connect to host, you
are instead routed straight to the given WSGI app.
After calling this method, when any code calls
httplib.HTTPConnection(host)
the connection object will be a fake. Its requests will be sent directly
to the given WSGI app rather than through a socket.
Code connecting to hosts other than host will not be affected.
This method may be called multiple times to map different hosts to
different apps.
This method returns the original HTTPConnection object, so that the caller
can restore the default HTTPConnection interface (for all hosts).
"""
class HTTPConnectionDecorator(object):
"""Wraps the real HTTPConnection class so that when you instantiate
the class you might instead get a fake instance.
"""
def __init__(self, wrapped):
self.wrapped = wrapped
def __call__(self, connection_host, *args, **kwargs):
if connection_host == host:
return FakeHttplibConnection(app, host)
else:
return self.wrapped(connection_host, *args, **kwargs)
oldHTTPConnection = httplib.HTTPConnection
httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
return oldHTTPConnection
**** CubicPower OpenStack Study ****
def __init__(self, wrapped):
self.wrapped = wrapped
**** CubicPower OpenStack Study ****
def __call__(self, connection_host, *args, **kwargs):
if connection_host == host:
return FakeHttplibConnection(app, host)
else:
return self.wrapped(connection_host, *args, **kwargs)
oldHTTPConnection = httplib.HTTPConnection
httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
return oldHTTPConnection
**** CubicPower OpenStack Study ****
class WsgiLimiterProxyTest(BaseLimitTestSuite):
"""Tests for the `limits.WsgiLimiterProxy` class."""
**** CubicPower OpenStack Study ****
def setUp(self):
"""setUp() for WsgiLimiterProxyTest.
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
directly by something like the `httplib` library.
"""
super(WsgiLimiterProxyTest, self).setUp()
self.app = limits.WsgiLimiter(TEST_LIMITS)
oldHTTPConnection = (
wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))
self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
self.addCleanup(self._restore, oldHTTPConnection)
**** CubicPower OpenStack Study ****
def _restore(self, oldHTTPConnection):
# restore original HTTPConnection object
httplib.HTTPConnection = oldHTTPConnection
**** CubicPower OpenStack Study ****
def test_200(self):
"""Successful request test."""
delay = self.proxy.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
**** CubicPower OpenStack Study ****
def test_403(self):
"""Forbidden request test."""
delay = self.proxy.check_for_delay("GET", "/delayed")
self.assertEqual(delay, (None, None))
delay, error = self.proxy.check_for_delay("GET", "/delayed")
error = error.strip()
expected = ("60.00", "403 Forbidden\n\nOnly 1 GET request(s) can be "
"made to /delayed every minute.")
self.assertEqual((delay, error), expected)
**** CubicPower OpenStack Study ****
class LimitsViewBuilderTest(test.TestCase):
**** CubicPower OpenStack Study ****
def setUp(self):
super(LimitsViewBuilderTest, self).setUp()
self.view_builder = views.limits.ViewBuilder()
self.rate_limits = [{"URI": "*",
"regex": ".*",
"value": 10,
"verb": "POST",
"remaining": 2,
"unit": "MINUTE",
"resetTime": 1311272226},
{"URI": "*/volumes",
"regex": "^/volumes",
"value": 50,
"verb": "POST",
"remaining": 10,
"unit": "DAY",
"resetTime": 1311272226}]
self.absolute_limits = {"metadata_items": 1,
"injected_files": 5,
"injected_file_content_bytes": 5}
**** CubicPower OpenStack Study ****
def test_build_limits(self):
tdate = "2011-07-21T18:17:06Z"
expected_limits = {
"limits": {"rate": [{"uri": "*",
"regex": ".*",
"limit": [{"value": 10,
"verb": "POST",
"remaining": 2,
"unit": "MINUTE",
"next-available": tdate}]},
{"uri": "*/volumes",
"regex": "^/volumes",
"limit": [{"value": 50,
"verb": "POST",
"remaining": 10,
"unit": "DAY",
"next-available": tdate}]}],
"absolute": {"maxServerMeta": 1,
"maxImageMeta": 1,
"maxPersonality": 5,
"maxPersonalitySize": 5}}}
output = self.view_builder.build(self.rate_limits,
self.absolute_limits)
self.assertDictMatch(output, expected_limits)
**** CubicPower OpenStack Study ****
def test_build_limits_empty_limits(self):
expected_limits = {"limits": {"rate": [],
"absolute": {}}}
abs_limits = {}
rate_limits = []
output = self.view_builder.build(rate_limits, abs_limits)
self.assertDictMatch(output, expected_limits)
**** CubicPower OpenStack Study ****
class LimitsXMLSerializationTest(test.TestCase):
**** CubicPower OpenStack Study ****
def test_xml_declaration(self):
serializer = limits.LimitsTemplate()
fixture = {"limits": {
"rate": [],
"absolute": {}}}
output = serializer.serialize(fixture)
has_dec = output.startswith("")
self.assertTrue(has_dec)
**** CubicPower OpenStack Study ****
def test_index(self):
tdate = "2011-12-15T22:42:45Z"
serializer = limits.LimitsTemplate()
fixture = {"limits": {"rate": [{"uri": "*",
"regex": ".*",
"limit": [{"value": 10,
"verb": "POST",
"remaining": 2,
"unit": "MINUTE",
"next-available": tdate}]},
{"uri": "*/servers",
"regex": "^/servers",
"limit": [{"value": 50,
"verb": "POST",
"remaining": 10,
"unit": "DAY",
"next-available": tdate}]}],
"absolute": {"maxServerMeta": 1,
"maxImageMeta": 1,
"maxPersonality": 5,
"maxPersonalitySize": 10240}}}
output = serializer.serialize(fixture)
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
#verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 4)
for limit in absolutes:
name = limit.get('name')
value = limit.get('value')
self.assertEqual(value, str(fixture['limits']['absolute'][name]))
#verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 2)
for i, rate in enumerate(rates):
for key in ['uri', 'regex']:
self.assertEqual(rate.get(key),
str(fixture['limits']['rate'][i][key]))
rate_limits = rate.xpath('ns:limit', namespaces=NS)
self.assertEqual(len(rate_limits), 1)
for j, limit in enumerate(rate_limits):
for key in ['verb', 'value', 'remaining', 'unit',
'next-available']:
self.assertEqual(
limit.get(key),
str(fixture['limits']['rate'][i]['limit'][j][key]))
**** CubicPower OpenStack Study ****
def test_index_no_limits(self):
serializer = limits.LimitsTemplate()
fixture = {"limits": {
"rate": [],
"absolute": {}}}
output = serializer.serialize(fixture)
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
#verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 0)
#verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 0)