**** CubicPower OpenStack Study ****
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Functional tests for the File store interface
"""
import BaseHTTPServer
import os
import signal
import testtools
import glance.store.http
import glance.tests.functional.store as store_tests
**** CubicPower OpenStack Study ****
def get_handler_class(fixture):
class StaticHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Length', str(len(fixture)))
self.end_headers()
self.wfile.write(fixture)
return
def do_HEAD(self):
self.send_response(200)
self.send_header('Content-Length', str(len(fixture)))
self.end_headers()
return
def log_message(*args, **kwargs):
# Override this method to prevent debug output from going
# to stderr during testing
return
return StaticHTTPRequestHandler
**** CubicPower OpenStack Study ****
def http_server(image_id, image_data):
server_address = ('127.0.0.1', 0)
handler_class = get_handler_class(image_data)
httpd = BaseHTTPServer.HTTPServer(server_address, handler_class)
port = httpd.socket.getsockname()[1]
pid = os.fork()
if pid == 0:
httpd.serve_forever()
else:
return pid, port
**** CubicPower OpenStack Study ****
class TestHTTPStore(store_tests.BaseTestCase, testtools.TestCase):
store_cls_path = 'glance.store.http.Store'
store_cls = glance.store.http.Store
store_name = 'http'
**** CubicPower OpenStack Study ****
def setUp(self):
super(TestHTTPStore, self).setUp()
self.kill_pid = None
**** CubicPower OpenStack Study ****
def tearDown(self):
if self.kill_pid is not None:
os.kill(self.kill_pid, signal.SIGKILL)
super(TestHTTPStore, self).tearDown()
**** CubicPower OpenStack Study ****
def get_store(self, **kwargs):
store = glance.store.http.Store(context=kwargs.get('context'))
store.configure()
return store
**** CubicPower OpenStack Study ****
def stash_image(self, image_id, image_data):
self.kill_pid, http_port = http_server(image_id, image_data)
return 'http://127.0.0.1:%s/' % (http_port,)