¡@

Home 

OpenStack Study: cisco_csr_mock.py

OpenStack Index

**** CubicPower OpenStack Study ****

def repeat(n):

    """Decorator to limit the number of times a handler is called.

    Will allow the wrapped function (handler) to be called 'n' times.

    After that, this will return None for any additional calls,

    allowing other handlers, if any, to be invoked.

    """

    class static:

        retries = n

    def decorator(func):

        @wraps(func)

        def wrapped(*args, **kwargs):

            if static.retries == 0:

                return None

            static.retries -= 1

            return func(*args, **kwargs)

        return wrapped

    return decorator

**** CubicPower OpenStack Study ****

def filter_request(methods, resource):

    """Decorator to invoke handler once for a specific resource.

    This will call the handler only for a specific resource using

    a specific method(s). Any other resource request or method will

    return None, allowing other handlers, if any, to be invoked.

    """

    class static:

        target_methods = [m.upper() for m in methods]

        target_resource = resource

    def decorator(func):

        @wraps(func)

        def wrapped(*args, **kwargs):

            if (args[1].method in static.target_methods and

                static.target_resource in args[0].path):

                return func(*args, **kwargs)

            else:

                return None  # Not for this resource

        return wrapped

    return decorator

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def token(url, request):

    if 'auth/token-services' in url.path:

        return {'status_code': requests.codes.OK,

                'content': {'token-id': 'dummy-token'}}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def token_unauthorized(url, request):

    if 'auth/token-services' in url.path:

        return {'status_code': requests.codes.UNAUTHORIZED}

@httmock.urlmatch(netloc=r'wrong-host')

**** CubicPower OpenStack Study ****

def token_wrong_host(url, request):

    raise r_exc.ConnectionError()

@httmock.all_requests

**** CubicPower OpenStack Study ****

def token_timeout(url, request):

    raise r_exc.Timeout()

@filter_request(['get'], 'global/host-name')

@httmock.all_requests

**** CubicPower OpenStack Study ****

def timeout(url, request):

    """Simulated timeout of a normal request."""

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    raise r_exc.Timeout()

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def no_such_resource(url, request):

    """Indicate not found error, when invalid resource requested."""

    return {'status_code': requests.codes.NOT_FOUND}

@filter_request(['get'], 'global/host-name')

@repeat(1)

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def expired_request(url, request):

    """Simulate access denied failure on first request for this resource.

    Intent here is to simulate that the token has expired, by failing

    the first request to the resource. Because of the repeat=1, this

    will only be called once, and subsequent calls will not be handled

    by this function, but instead will access the normal handler and

    will pass. Currently configured for a GET request, but will work

    with POST and PUT as well. For DELETE, would need to filter_request on a

    different resource (e.g. 'global/local-users')

    """

    return {'status_code': requests.codes.UNAUTHORIZED}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def normal_get(url, request):

    if request.method != 'GET':

        return

    LOG.debug("DEBUG: GET mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    if 'global/host-name' in url.path:

        content = {u'kind': u'object#host-name',

                   u'host-name': u'Router'}

        return httmock.response(requests.codes.OK, content=content)

    if 'global/local-users' in url.path:

        content = {u'kind': u'collection#local-user',

                   u'users': ['peter', 'paul', 'mary']}

        return httmock.response(requests.codes.OK, content=content)

    if 'interfaces/GigabitEthernet' in url.path:

        actual_interface = url.path.split('/')[-1]

        ip = actual_interface[-1]

        content = {u'kind': u'object#interface',

                   u'description': u'Changed description',

                   u'if-name': actual_interface,

                   u'proxy-arp': True,

                   u'subnet-mask': u'255.255.255.0',

                   u'icmp-unreachable': True,

                   u'nat-direction': u'',

                   u'icmp-redirects': True,

                   u'ip-address': u'192.168.200.%s' % ip,

                   u'verify-unicast-source': False,

                   u'type': u'ethernet'}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/ike/policies/2' in url.path:

        content = {u'kind': u'object#ike-policy',

                   u'priority-id': u'2',

                   u'version': u'v1',

                   u'local-auth-method': u'pre-share',

                   u'encryption': u'aes',

                   u'hash': u'sha',

                   u'dhGroup': 5,

                   u'lifetime': 3600}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/ike/keyrings' in url.path:

        content = {u'kind': u'object#ike-keyring',

                   u'keyring-name': u'5',

                   u'pre-shared-key-list': [

                       {u'key': u'super-secret',

                        u'encrypted': False,

                        u'peer-address': u'10.10.10.20 255.255.255.0'}

                   ]}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/ipsec/policies/' in url.path:

        ipsec_policy_id = url.path.split('/')[-1]

        content = {u'kind': u'object#ipsec-policy',

                   u'mode': u'tunnel',

                   u'policy-id': u'%s' % ipsec_policy_id,

                   u'protection-suite': {

                       u'esp-encryption': u'esp-aes',

                       u'esp-authentication': u'esp-sha-hmac',

                       u'ah': u'ah-sha-hmac',

                   },

                   u'anti-replay-window-size': u'128',

                   u'lifetime-sec': 120,

                   u'pfs': u'group5',

                   u'lifetime-kb': 4608000,

                   u'idle-time': None}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/site-to-site/Tunnel' in url.path:

        tunnel = url.path.split('/')[-1]

        # Use same number, to allow mock to generate IPSec policy ID

        ipsec_policy_id = tunnel[6:]

        content = {u'kind': u'object#vpn-site-to-site',

                   u'vpn-interface-name': u'%s' % tunnel,

                   u'ip-version': u'ipv4',

                   u'vpn-type': u'site-to-site',

                   u'ipsec-policy-id': u'%s' % ipsec_policy_id,

                   u'ike-profile-id': None,

                   u'mtu': 1500,

                   u'local-device': {

                       u'ip-address': '10.3.0.1/24',

                       u'tunnel-ip-address': '10.10.10.10'

                   },

                   u'remote-device': {

                       u'tunnel-ip-address': '10.10.10.20'

                   }}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/ike/keepalive' in url.path:

        content = {u'interval': 60,

                   u'retry': 4,

                   u'periodic': True}

        return httmock.response(requests.codes.OK, content=content)

    if 'routing-svc/static-routes' in url.path:

        content = {u'destination-network': u'10.1.0.0/24',

                   u'kind': u'object#static-route',

                   u'next-hop-router': None,

                   u'outgoing-interface': u'GigabitEthernet1',

                   u'admin-distance': 1}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/site-to-site/active/sessions' in url.path:

        # Only including needed fields for mock

        content = {u'kind': u'collection#vpn-active-sessions',

                   u'items': [{u'status': u'DOWN-NEGOTIATING',

                               u'vpn-interface-name': u'Tunnel123'}, ]}

        return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'vpn-svc/ike/keyrings')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_fqdn(url, request):

    LOG.debug("DEBUG: GET FQDN mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    content = {u'kind': u'object#ike-keyring',

               u'keyring-name': u'5',

               u'pre-shared-key-list': [

                   {u'key': u'super-secret',

                    u'encrypted': False,

                    u'peer-address': u'cisco.com'}

               ]}

    return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'vpn-svc/ipsec/policies/')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_no_ah(url, request):

    LOG.debug("DEBUG: GET No AH mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    ipsec_policy_id = url.path.split('/')[-1]

    content = {u'kind': u'object#ipsec-policy',

               u'mode': u'tunnel',

               u'anti-replay-window-size': u'128',

               u'policy-id': u'%s' % ipsec_policy_id,

               u'protection-suite': {

                   u'esp-encryption': u'esp-aes',

                   u'esp-authentication': u'esp-sha-hmac',

               },

               u'lifetime-sec': 120,

               u'pfs': u'group5',

               u'lifetime-kb': 4608000,

               u'idle-time': None}

    return httmock.response(requests.codes.OK, content=content)

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_defaults(url, request):

    if request.method != 'GET':

        return

    LOG.debug("DEBUG: GET mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    if 'vpn-svc/ike/policies/2' in url.path:

        content = {u'kind': u'object#ike-policy',

                   u'priority-id': u'2',

                   u'version': u'v1',

                   u'local-auth-method': u'pre-share',

                   u'encryption': u'des',

                   u'hash': u'sha',

                   u'dhGroup': 1,

                   u'lifetime': 86400}

        return httmock.response(requests.codes.OK, content=content)

    if 'vpn-svc/ipsec/policies/' in url.path:

        ipsec_policy_id = url.path.split('/')[-1]

        content = {u'kind': u'object#ipsec-policy',

                   u'mode': u'tunnel',

                   u'policy-id': u'%s' % ipsec_policy_id,

                   u'protection-suite': {},

                   u'lifetime-sec': 3600,

                   u'pfs': u'Disable',

                   u'anti-replay-window-size': u'None',

                   u'lifetime-kb': 4608000,

                   u'idle-time': None}

        return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_unnumbered(url, request):

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    tunnel = url.path.split('/')[-1]

    ipsec_policy_id = tunnel[6:]

    content = {u'kind': u'object#vpn-site-to-site',

               u'vpn-interface-name': u'%s' % tunnel,

               u'ip-version': u'ipv4',

               u'vpn-type': u'site-to-site',

               u'ipsec-policy-id': u'%s' % ipsec_policy_id,

               u'ike-profile-id': None,

               u'mtu': 1500,

               u'local-device': {

                   u'ip-address': u'GigabitEthernet3',

                   u'tunnel-ip-address': u'10.10.10.10'

               },

               u'remote-device': {

                   u'tunnel-ip-address': u'10.10.10.20'

               }}

    return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_mtu(url, request):

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    tunnel = url.path.split('/')[-1]

    ipsec_policy_id = tunnel[6:]

    content = {u'kind': u'object#vpn-site-to-site',

               u'vpn-interface-name': u'%s' % tunnel,

               u'ip-version': u'ipv4',

               u'vpn-type': u'site-to-site',

               u'ipsec-policy-id': u'%s' % ipsec_policy_id,

               u'ike-profile-id': None,

               u'mtu': 9192,

               u'local-device': {

                   u'ip-address': u'10.3.0.1/24',

                   u'tunnel-ip-address': u'10.10.10.10'

               },

               u'remote-device': {

                   u'tunnel-ip-address': u'10.10.10.20'

               }}

    return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'vpn-svc/ike/keepalive')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_not_configured(url, request):

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.NOT_FOUND}

@filter_request(['get'], 'vpn-svc/site-to-site/active/sessions')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_none(url, request):

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    content = {u'kind': u'collection#vpn-active-sessions',

               u'items': []}

    return httmock.response(requests.codes.OK, content=content)

@filter_request(['get'], 'interfaces/GigabitEthernet3')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def get_local_ip(url, request):

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    content = {u'kind': u'object#interface',

               u'subnet-mask': u'255.255.255.0',

               u'ip-address': u'10.5.0.2'}

    return httmock.response(requests.codes.OK, content=content)

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post(url, request):

    if request.method != 'POST':

        return

    LOG.debug("DEBUG: POST mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    if 'interfaces/GigabitEthernet' in url.path:

        return {'status_code': requests.codes.NO_CONTENT}

    if 'global/local-users' in url.path:

        if 'username' not in request.body:

            return {'status_code': requests.codes.BAD_REQUEST}

        if '"privilege": 20' in request.body:

            return {'status_code': requests.codes.BAD_REQUEST}

        headers = {'location': '%s/test-user' % url.geturl()}

        return httmock.response(requests.codes.CREATED, headers=headers)

    if 'vpn-svc/ike/policies' in url.path:

        headers = {'location': "%s/2" % url.geturl()}

        return httmock.response(requests.codes.CREATED, headers=headers)

    if 'vpn-svc/ipsec/policies' in url.path:

        m = re.search(r'"policy-id": "(\S+)"', request.body)

        if m:

            headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}

            return httmock.response(requests.codes.CREATED, headers=headers)

        return {'status_code': requests.codes.BAD_REQUEST}

    if 'vpn-svc/ike/keyrings' in url.path:

        headers = {'location': "%s/5" % url.geturl()}

        return httmock.response(requests.codes.CREATED, headers=headers)

    if 'vpn-svc/site-to-site' in url.path:

        m = re.search(r'"vpn-interface-name": "(\S+)"', request.body)

        if m:

            headers = {'location': "%s/%s" % (url.geturl(), m.group(1))}

            return httmock.response(requests.codes.CREATED, headers=headers)

        return {'status_code': requests.codes.BAD_REQUEST}

    if 'routing-svc/static-routes' in url.path:

        headers = {'location':

                   "%s/10.1.0.0_24_GigabitEthernet1" % url.geturl()}

        return httmock.response(requests.codes.CREATED, headers=headers)

@filter_request(['post'], 'global/local-users')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_change_attempt(url, request):

    LOG.debug("DEBUG: POST change value mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.NOT_FOUND,

            'content': {

                u'error-code': -1,

                u'error-message': u'user test-user already exists'}}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_duplicate(url, request):

    LOG.debug("DEBUG: POST duplicate mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST,

            'content': {

                u'error-code': -1,

                u'error-message': u'policy 2 exist, not allow to '

                                  u'update policy using POST method'}}

@filter_request(['post'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_missing_ipsec_policy(url, request):

    LOG.debug("DEBUG: POST missing ipsec policy mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST}

@filter_request(['post'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_missing_ike_policy(url, request):

    LOG.debug("DEBUG: POST missing ike policy mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST}

@filter_request(['post'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_bad_ip(url, request):

    LOG.debug("DEBUG: POST bad IP mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST}

@filter_request(['post'], 'vpn-svc/site-to-site')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_bad_mtu(url, request):

    LOG.debug("DEBUG: POST bad mtu mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST}

@filter_request(['post'], 'vpn-svc/ipsec/policies')

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def post_bad_lifetime(url, request):

    LOG.debug("DEBUG: POST bad lifetime mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    return {'status_code': requests.codes.BAD_REQUEST}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def put(url, request):

    if request.method != 'PUT':

        return

    LOG.debug("DEBUG: PUT mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    # Any resource

    return {'status_code': requests.codes.NO_CONTENT}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def delete(url, request):

    if request.method != 'DELETE':

        return

    LOG.debug("DEBUG: DELETE mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    # Any resource

    return {'status_code': requests.codes.NO_CONTENT}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def delete_unknown(url, request):

    if request.method != 'DELETE':

        return

    LOG.debug("DEBUG: DELETE unknown mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    # Any resource

    return {'status_code': requests.codes.NOT_FOUND,

            'content': {

                u'error-code': -1,

                u'error-message': 'user unknown not found'}}

@httmock.urlmatch(netloc=r'localhost')

**** CubicPower OpenStack Study ****

def delete_not_allowed(url, request):

    if request.method != 'DELETE':

        return

    LOG.debug("DEBUG: DELETE not allowed mock for %s", url)

    if not request.headers.get('X-auth-token', None):

        return {'status_code': requests.codes.UNAUTHORIZED}

    # Any resource

    return {'status_code': requests.codes.METHOD_NOT_ALLOWED}