**** CubicPower OpenStack Study ****
def repeat(n):
"""Decorator to limit the number of times a handler is called.
Will allow the wrapped function (handler) to be called 'n' times.
After that, this will return None for any additional calls,
allowing other handlers, if any, to be invoked.
"""
class static:
retries = n
def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
if static.retries == 0:
return None
static.retries -= 1
return func(*args, **kwargs)
return wrapped
return decorator
**** CubicPower OpenStack Study ****
def filter_request(methods, resource):
"""Decorator to invoke handler once for a specific resource.
This will call the handler only for a specific resource using
a specific method(s). Any other resource request or method will
return None, allowing other handlers, if any, to be invoked.
"""
class static:
target_methods = [m.upper() for m in methods]
target_resource = resource
def decorator(func):
@wraps(func)
def wrapped(*args, **kwargs):
if (args[1].method in static.target_methods and
static.target_resource in args[0].path):
return func(*args, **kwargs)
else:
return None # Not for this resource
return wrapped
return decorator
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def token(url, request):
if 'auth/token-services' in url.path:
return {'status_code': requests.codes.OK,
'content': {'token-id': 'dummy-token'}}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def token_unauthorized(url, request):
if 'auth/token-services' in url.path:
return {'status_code': requests.codes.UNAUTHORIZED}
@httmock.urlmatch(netloc=r'wrong-host')
**** CubicPower OpenStack Study ****
def token_wrong_host(url, request):
raise r_exc.ConnectionError()
@httmock.all_requests
**** CubicPower OpenStack Study ****
def token_timeout(url, request):
raise r_exc.Timeout()
@filter_request(['get'], 'global/host-name')
@httmock.all_requests
**** CubicPower OpenStack Study ****
def timeout(url, request):
"""Simulated timeout of a normal request."""
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
raise r_exc.Timeout()
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def no_such_resource(url, request):
"""Indicate not found error, when invalid resource requested."""
return {'status_code': requests.codes.NOT_FOUND}
@filter_request(['get'], 'global/host-name')
@repeat(1)
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def expired_request(url, request):
"""Simulate access denied failure on first request for this resource.
Intent here is to simulate that the token has expired, by failing
the first request to the resource. Because of the repeat=1, this
will only be called once, and subsequent calls will not be handled
by this function, but instead will access the normal handler and
will pass. Currently configured for a GET request, but will work
with POST and PUT as well. For DELETE, would need to filter_request on a
different resource (e.g. 'global/local-users')
"""
return {'status_code': requests.codes.UNAUTHORIZED}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def normal_get(url, request):
if request.method != 'GET':
return
LOG.debug("DEBUG: GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'global/host-name' in url.path:
content = {u'kind': u'object#host-name',
u'host-name': u'Router'}
return httmock.response(requests.codes.OK, content=content)
if 'global/local-users' in url.path:
content = {u'kind': u'collection#local-user',
u'users': ['peter', 'paul', 'mary']}
return httmock.response(requests.codes.OK, content=content)
if 'interfaces/GigabitEthernet' in url.path:
actual_interface = url.path.split('/')[-1]
ip = actual_interface[-1]
content = {u'kind': u'object#interface',
u'description': u'Changed description',
u'if-name': actual_interface,
u'proxy-arp': True,
u'subnet-mask': u'255.255.255.0',
u'icmp-unreachable': True,
u'nat-direction': u'',
u'icmp-redirects': True,
u'ip-address': u'192.168.200.%s' % ip,
u'verify-unicast-source': False,
u'type': u'ethernet'}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/ike/policies/2' in url.path:
content = {u'kind': u'object#ike-policy',
u'priority-id': u'2',
u'version': u'v1',
u'local-auth-method': u'pre-share',
u'encryption': u'aes',
u'hash': u'sha',
u'dhGroup': 5,
u'lifetime': 3600}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/ike/keyrings' in url.path:
content = {u'kind': u'object#ike-keyring',
u'keyring-name': u'5',
u'pre-shared-key-list': [
{u'key': u'super-secret',
u'encrypted': False,
u'peer-address': u'10.10.10.20 255.255.255.0'}
]}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/ipsec/policies/' in url.path:
ipsec_policy_id = url.path.split('/')[-1]
content = {u'kind': u'object#ipsec-policy',
u'mode': u'tunnel',
u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
u'esp-authentication': u'esp-sha-hmac',
u'ah': u'ah-sha-hmac',
},
u'anti-replay-window-size': u'128',
u'lifetime-sec': 120,
u'pfs': u'group5',
u'lifetime-kb': 4608000,
u'idle-time': None}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/site-to-site/Tunnel' in url.path:
tunnel = url.path.split('/')[-1]
# Use same number, to allow mock to generate IPSec policy ID
ipsec_policy_id = tunnel[6:]
content = {u'kind': u'object#vpn-site-to-site',
u'vpn-interface-name': u'%s' % tunnel,
u'ip-version': u'ipv4',
u'vpn-type': u'site-to-site',
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
u'ike-profile-id': None,
u'mtu': 1500,
u'local-device': {
u'ip-address': '10.3.0.1/24',
u'tunnel-ip-address': '10.10.10.10'
},
u'remote-device': {
u'tunnel-ip-address': '10.10.10.20'
}}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/ike/keepalive' in url.path:
content = {u'interval': 60,
u'retry': 4,
u'periodic': True}
return httmock.response(requests.codes.OK, content=content)
if 'routing-svc/static-routes' in url.path:
content = {u'destination-network': u'10.1.0.0/24',
u'kind': u'object#static-route',
u'next-hop-router': None,
u'outgoing-interface': u'GigabitEthernet1',
u'admin-distance': 1}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/site-to-site/active/sessions' in url.path:
# Only including needed fields for mock
content = {u'kind': u'collection#vpn-active-sessions',
u'items': [{u'status': u'DOWN-NEGOTIATING',
u'vpn-interface-name': u'Tunnel123'}, ]}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'vpn-svc/ike/keyrings')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_fqdn(url, request):
LOG.debug("DEBUG: GET FQDN mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'object#ike-keyring',
u'keyring-name': u'5',
u'pre-shared-key-list': [
{u'key': u'super-secret',
u'encrypted': False,
u'peer-address': u'cisco.com'}
]}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'vpn-svc/ipsec/policies/')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_no_ah(url, request):
LOG.debug("DEBUG: GET No AH mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
ipsec_policy_id = url.path.split('/')[-1]
content = {u'kind': u'object#ipsec-policy',
u'mode': u'tunnel',
u'anti-replay-window-size': u'128',
u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': {
u'esp-encryption': u'esp-aes',
u'esp-authentication': u'esp-sha-hmac',
},
u'lifetime-sec': 120,
u'pfs': u'group5',
u'lifetime-kb': 4608000,
u'idle-time': None}
return httmock.response(requests.codes.OK, content=content)
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_defaults(url, request):
if request.method != 'GET':
return
LOG.debug("DEBUG: GET mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'vpn-svc/ike/policies/2' in url.path:
content = {u'kind': u'object#ike-policy',
u'priority-id': u'2',
u'version': u'v1',
u'local-auth-method': u'pre-share',
u'encryption': u'des',
u'hash': u'sha',
u'dhGroup': 1,
u'lifetime': 86400}
return httmock.response(requests.codes.OK, content=content)
if 'vpn-svc/ipsec/policies/' in url.path:
ipsec_policy_id = url.path.split('/')[-1]
content = {u'kind': u'object#ipsec-policy',
u'mode': u'tunnel',
u'policy-id': u'%s' % ipsec_policy_id,
u'protection-suite': {},
u'lifetime-sec': 3600,
u'pfs': u'Disable',
u'anti-replay-window-size': u'None',
u'lifetime-kb': 4608000,
u'idle-time': None}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_unnumbered(url, request):
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
tunnel = url.path.split('/')[-1]
ipsec_policy_id = tunnel[6:]
content = {u'kind': u'object#vpn-site-to-site',
u'vpn-interface-name': u'%s' % tunnel,
u'ip-version': u'ipv4',
u'vpn-type': u'site-to-site',
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
u'ike-profile-id': None,
u'mtu': 1500,
u'local-device': {
u'ip-address': u'GigabitEthernet3',
u'tunnel-ip-address': u'10.10.10.10'
},
u'remote-device': {
u'tunnel-ip-address': u'10.10.10.20'
}}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_mtu(url, request):
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
tunnel = url.path.split('/')[-1]
ipsec_policy_id = tunnel[6:]
content = {u'kind': u'object#vpn-site-to-site',
u'vpn-interface-name': u'%s' % tunnel,
u'ip-version': u'ipv4',
u'vpn-type': u'site-to-site',
u'ipsec-policy-id': u'%s' % ipsec_policy_id,
u'ike-profile-id': None,
u'mtu': 9192,
u'local-device': {
u'ip-address': u'10.3.0.1/24',
u'tunnel-ip-address': u'10.10.10.10'
},
u'remote-device': {
u'tunnel-ip-address': u'10.10.10.20'
}}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'vpn-svc/ike/keepalive')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_not_configured(url, request):
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.NOT_FOUND}
@filter_request(['get'], 'vpn-svc/site-to-site/active/sessions')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_none(url, request):
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'collection#vpn-active-sessions',
u'items': []}
return httmock.response(requests.codes.OK, content=content)
@filter_request(['get'], 'interfaces/GigabitEthernet3')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def get_local_ip(url, request):
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
content = {u'kind': u'object#interface',
u'subnet-mask': u'255.255.255.0',
u'ip-address': u'10.5.0.2'}
return httmock.response(requests.codes.OK, content=content)
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post(url, request):
if request.method != 'POST':
return
LOG.debug("DEBUG: POST mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
if 'interfaces/GigabitEthernet' in url.path:
return {'status_code': requests.codes.NO_CONTENT}
if 'global/local-users' in url.path:
if 'username' not in request.body:
return {'status_code': requests.codes.BAD_REQUEST}
if '"privilege": 20' in request.body:
return {'status_code': requests.codes.BAD_REQUEST}
headers = {'location': '%s/test-user' % url.geturl()}
return httmock.response(requests.codes.CREATED, headers=headers)
if 'vpn-svc/ike/policies' in url.path:
headers = {'location': "%s/2" % url.geturl()}
return httmock.response(requests.codes.CREATED, headers=headers)
if 'vpn-svc/ipsec/policies' in url.path:
m = re.search(r'"policy-id": "(\S+)"', request.body)
if m:
headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}
return httmock.response(requests.codes.CREATED, headers=headers)
return {'status_code': requests.codes.BAD_REQUEST}
if 'vpn-svc/ike/keyrings' in url.path:
headers = {'location': "%s/5" % url.geturl()}
return httmock.response(requests.codes.CREATED, headers=headers)
if 'vpn-svc/site-to-site' in url.path:
m = re.search(r'"vpn-interface-name": "(\S+)"', request.body)
if m:
headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}
return httmock.response(requests.codes.CREATED, headers=headers)
return {'status_code': requests.codes.BAD_REQUEST}
if 'routing-svc/static-routes' in url.path:
headers = {'location':
"%s/10.1.0.0_24_GigabitEthernet1" % url.geturl()}
return httmock.response(requests.codes.CREATED, headers=headers)
@filter_request(['post'], 'global/local-users')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_change_attempt(url, request):
LOG.debug("DEBUG: POST change value mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.NOT_FOUND,
'content': {
u'error-code': -1,
u'error-message': u'user test-user already exists'}}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_duplicate(url, request):
LOG.debug("DEBUG: POST duplicate mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST,
'content': {
u'error-code': -1,
u'error-message': u'policy 2 exist, not allow to '
u'update policy using POST method'}}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_missing_ipsec_policy(url, request):
LOG.debug("DEBUG: POST missing ipsec policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_missing_ike_policy(url, request):
LOG.debug("DEBUG: POST missing ike policy mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_bad_ip(url, request):
LOG.debug("DEBUG: POST bad IP mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/site-to-site')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_bad_mtu(url, request):
LOG.debug("DEBUG: POST bad mtu mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@filter_request(['post'], 'vpn-svc/ipsec/policies')
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def post_bad_lifetime(url, request):
LOG.debug("DEBUG: POST bad lifetime mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
return {'status_code': requests.codes.BAD_REQUEST}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def put(url, request):
if request.method != 'PUT':
return
LOG.debug("DEBUG: PUT mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
return {'status_code': requests.codes.NO_CONTENT}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def delete(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
return {'status_code': requests.codes.NO_CONTENT}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def delete_unknown(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE unknown mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
return {'status_code': requests.codes.NOT_FOUND,
'content': {
u'error-code': -1,
u'error-message': 'user unknown not found'}}
@httmock.urlmatch(netloc=r'localhost')
**** CubicPower OpenStack Study ****
def delete_not_allowed(url, request):
if request.method != 'DELETE':
return
LOG.debug("DEBUG: DELETE not allowed mock for %s", url)
if not request.headers.get('X-auth-token', None):
return {'status_code': requests.codes.UNAUTHORIZED}
# Any resource
return {'status_code': requests.codes.METHOD_NOT_ALLOWED}