¡@

Home 

OpenStack Study: test_servermanager.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2014 Big Switch Networks, Inc. All rights reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

#

# @author: Kevin Benton, kevin.benton@bigswitch.com

#

import httplib

import socket

from contextlib import nested

import mock

from oslo.config import cfg

from neutron.manager import NeutronManager

from neutron.plugins.bigswitch import servermanager

from neutron.tests.unit.bigswitch import test_restproxy_plugin as test_rp

HTTPCON = 'httplib.HTTPConnection'

SERVERMANAGER = 'neutron.plugins.bigswitch.servermanager'

**** CubicPower OpenStack Study ****

class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):

**** CubicPower OpenStack Study ****

    def test_no_servers(self):

        cfg.CONF.set_override('servers', [], 'RESTPROXY')

        self.assertRaises(cfg.Error, servermanager.ServerPool)

**** CubicPower OpenStack Study ****

    def test_malformed_servers(self):

        cfg.CONF.set_override('servers', ['a:b:c'], 'RESTPROXY')

        self.assertRaises(cfg.Error, servermanager.ServerPool)

**** CubicPower OpenStack Study ****

    def test_sticky_cert_fetch_fail(self):

        pl = NeutronManager.get_plugin()

        pl.servers.ssl = True

        with mock.patch(

            'ssl.get_server_certificate',

            side_effect=Exception('There is no more entropy in the universe')

        ) as sslgetmock:

            self.assertRaises(

                cfg.Error,

                pl.servers._get_combined_cert_for_server,

                *('example.org', 443)

            )

            sslgetmock.assert_has_calls([mock.call(('example.org', 443))])

**** CubicPower OpenStack Study ****

    def test_consistency_watchdog(self):

        pl = NeutronManager.get_plugin()

        pl.servers.capabilities = []

        self.watch_p.stop()

        with nested(

            mock.patch('eventlet.sleep'),

            mock.patch(

                SERVERMANAGER + '.ServerPool.rest_call',

                side_effect=servermanager.RemoteRestError(

                    reason='Failure to break loop'

                )

            )

        ) as (smock, rmock):

            # should return immediately without consistency capability

            pl.servers._consistency_watchdog()

            self.assertFalse(smock.called)

            pl.servers.capabilities = ['consistency']

            self.assertRaises(servermanager.RemoteRestError,

                              pl.servers._consistency_watchdog)

**** CubicPower OpenStack Study ****

    def test_file_put_contents(self):

        pl = NeutronManager.get_plugin()

        with mock.patch(SERVERMANAGER + '.open', create=True) as omock:

            pl.servers._file_put_contents('somepath', 'contents')

            omock.assert_has_calls([mock.call('somepath', 'w')])

            omock.return_value.__enter__.return_value.assert_has_calls([

                mock.call.write('contents')

            ])

**** CubicPower OpenStack Study ****

    def test_combine_certs_to_file(self):

        pl = NeutronManager.get_plugin()

        with mock.patch(SERVERMANAGER + '.open', create=True) as omock:

            omock.return_value.__enter__().read.return_value = 'certdata'

            pl.servers._combine_certs_to_file(['cert1.pem', 'cert2.pem'],

                                              'combined.pem')

            # mock shared between read and write file handles so the calls

            # are mixed together

            omock.assert_has_calls([

                mock.call('combined.pem', 'w'),

                mock.call('cert1.pem', 'r'),

                mock.call('cert2.pem', 'r'),

            ], any_order=True)

            omock.return_value.__enter__.return_value.assert_has_calls([

                mock.call.read(),

                mock.call.write('certdata'),

                mock.call.read(),

                mock.call.write('certdata')

            ])

**** CubicPower OpenStack Study ****

    def test_reconnect_cached_connection(self):

        sp = servermanager.ServerPool()

        with mock.patch(HTTPCON) as conmock:

            rv = conmock.return_value

            rv.getresponse.return_value.getheader.return_value = 'HASH'

            sp.servers[0].capabilities = ['keep-alive']

            sp.servers[0].rest_call('GET', '/first')

            # raise an error on re-use to verify reconnect

            # return okay the second time so the reconnect works

            rv.request.side_effect = [httplib.ImproperConnectionState(),

                                      mock.MagicMock()]

            sp.servers[0].rest_call('GET', '/second')

        uris = [c[1][1] for c in rv.request.mock_calls]

        expected = [

            sp.base_uri + '/first',

            sp.base_uri + '/second',

            sp.base_uri + '/second',

        ]

        self.assertEqual(uris, expected)

**** CubicPower OpenStack Study ****

    def test_no_reconnect_recurse_to_infinity(self):

        # retry uses recursion when a reconnect is necessary

        # this test makes sure it stops after 1 recursive call

        sp = servermanager.ServerPool()

        with mock.patch(HTTPCON) as conmock:

            rv = conmock.return_value

            # hash header must be string instead of mock object

            rv.getresponse.return_value.getheader.return_value = 'HASH'

            sp.servers[0].capabilities = ['keep-alive']

            sp.servers[0].rest_call('GET', '/first')

            # after retrying once, the rest call should raise the

            # exception up

            rv.request.side_effect = httplib.ImproperConnectionState()

            self.assertRaises(httplib.ImproperConnectionState,

                              sp.servers[0].rest_call,

                              *('GET', '/second'))

            # 1 for the first call, 2 for the second with retry

            self.assertEqual(rv.request.call_count, 3)

**** CubicPower OpenStack Study ****

    def test_socket_error(self):

        sp = servermanager.ServerPool()

        with mock.patch(HTTPCON) as conmock:

            conmock.return_value.request.side_effect = socket.timeout()

            resp = sp.servers[0].rest_call('GET', '/')

            self.assertEqual(resp, (0, None, None, None))