

OpenStack Study: server.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2010-2012 OpenStack Foundation


# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at


# http://www.apache.org/licenses/LICENSE-2.0


# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,


# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import os

import time

import traceback

from datetime import datetime

from swift import gettext_ as _

from xml.etree.cElementTree import Element, SubElement, tostring

from eventlet import Timeout

import swift.common.db

from swift.container.backend import ContainerBroker, DATADIR

from swift.common.db import DatabaseAlreadyExists

from swift.common.container_sync_realms import ContainerSyncRealms

from swift.common.request_helpers import get_param, get_listing_content_type, \

split_and_validate_path, is_sys_or_user_meta

from swift.common.utils import get_logger, hash_path, public, \

normalize_timestamp, storage_directory, validate_sync_to, \

config_true_value, json, timing_stats, replication, \


from swift.common.constraints import CONTAINER_LISTING_LIMIT, \

check_mount, check_float, check_utf8

from swift.common.bufferedhttp import http_connect

from swift.common.exceptions import ConnectionTimeout

from swift.common.db_replicator import ReplicatorRpc

from swift.common.http import HTTP_NOT_FOUND, is_success

from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \

HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \

HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \

HTTPInsufficientStorage, HTTPException, HeaderKeyDict

**** CubicPower OpenStack Study ****

class ContainerController(object):

"""WSGI Controller for the container server."""

# Ensure these are all lowercase

save_headers = ['x-container-read', 'x-container-write',

'x-container-sync-key', 'x-container-sync-to']

**** CubicPower OpenStack Study ****

    def __init__(self, conf, logger=None):

        self.logger = logger or get_logger(conf, log_route='container-server')

        self.log_requests = config_true_value(conf.get('log_requests', 'true'))

        self.root = conf.get('devices', '/srv/node')

        self.mount_check = config_true_value(conf.get('mount_check', 'true'))

        self.node_timeout = int(conf.get('node_timeout', 3))

        self.conn_timeout = float(conf.get('conn_timeout', 0.5))

        replication_server = conf.get('replication_server', None)

        if replication_server is not None:

            replication_server = config_true_value(replication_server)

        self.replication_server = replication_server

        #: ContainerSyncCluster instance for validating sync-to values.

        self.realms_conf = ContainerSyncRealms(


                conf.get('swift_dir', '/etc/swift'),



        #: The list of hosts we're allowed to send syncs to. This can be

        #: overridden by data in self.realms_conf

        self.allowed_sync_hosts = [


            for h in conf.get('allowed_sync_hosts', '').split(',')

            if h.strip()]

        self.replicator_rpc = ReplicatorRpc(

            self.root, DATADIR, ContainerBroker, self.mount_check,


        self.auto_create_account_prefix = \

            conf.get('auto_create_account_prefix') or '.'

        if config_true_value(conf.get('allow_versions', 'f')):


        swift.common.db.DB_PREALLOCATION = \

            config_true_value(conf.get('db_preallocation', 'f'))

**** CubicPower OpenStack Study ****

    def _get_container_broker(self, drive, part, account, container, **kwargs):


        Get a DB broker for the container.

        :param drive: drive that holds the container

        :param part: partition the container is in

        :param account: account name

        :param container: container name

        :returns: ContainerBroker object


        hsh = hash_path(account, container)

        db_dir = storage_directory(DATADIR, part, hsh)

        db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')

        kwargs.setdefault('account', account)

        kwargs.setdefault('container', container)

        kwargs.setdefault('logger', self.logger)

        return ContainerBroker(db_path, **kwargs)

**** CubicPower OpenStack Study ****

    def account_update(self, req, account, container, broker):


        Update the account server(s) with latest container info.

        :param req: swob.Request object

        :param account: account name

        :param container: container name

        :param broker: container DB broker object

        :returns: if all the account requests return a 404 error code,

                  HTTPNotFound response object,

                  if the account cannot be updated due to a malformed header,

                  an HTTPBadRequest response object,

                  otherwise None.


        account_hosts = [h.strip() for h in

                         req.headers.get('X-Account-Host', '').split(',')]

        account_devices = [d.strip() for d in

                           req.headers.get('X-Account-Device', '').split(',')]

        account_partition = req.headers.get('X-Account-Partition', '')

        if len(account_hosts) != len(account_devices):

            # This shouldn't happen unless there's a bug in the proxy,

            # but if there is, we want to know about it.

            self.logger.error(_('ERROR Account update failed: different  '

                                'numbers of hosts and devices in request: '

                                '"%s" vs "%s"') %

                               (req.headers.get('X-Account-Host', ''),

                                req.headers.get('X-Account-Device', '')))

            return HTTPBadRequest(req=req)

        if account_partition:

            updates = zip(account_hosts, account_devices)


            updates = []

        account_404s = 0

        for account_host, account_device in updates:

            account_ip, account_port = account_host.rsplit(':', 1)

            new_path = '/' + '/'.join([account, container])

            info = broker.get_info()

            account_headers = HeaderKeyDict({

                'x-put-timestamp': info['put_timestamp'],

                'x-delete-timestamp': info['delete_timestamp'],

                'x-object-count': info['object_count'],

                'x-bytes-used': info['bytes_used'],

                'x-trans-id': req.headers.get('x-trans-id', '-'),

                'user-agent': 'container-server %s' % os.getpid(),

                'referer': req.as_referer()})

            if req.headers.get('x-account-override-deleted', 'no').lower() == \


                account_headers['x-account-override-deleted'] = 'yes'


                with ConnectionTimeout(self.conn_timeout):

                    conn = http_connect(

                        account_ip, account_port, account_device,

                        account_partition, 'PUT', new_path, account_headers)

                with Timeout(self.node_timeout):

                    account_response = conn.getresponse()


                    if account_response.status == HTTP_NOT_FOUND:

                        account_404s += 1

                    elif not is_success(account_response.status):


                            'ERROR Account update failed '

                            'with %(ip)s:%(port)s/%(device)s (will retry '

                            'later): Response %(status)s %(reason)s'),

                            {'ip': account_ip, 'port': account_port,

                             'device': account_device,

                             'status': account_response.status,

                             'reason': account_response.reason})

            except (Exception, Timeout):


                    'ERROR account update failed with '

                    '%(ip)s:%(port)s/%(device)s (will retry later)'),

                    {'ip': account_ip, 'port': account_port,

                     'device': account_device})

        if updates and account_404s == len(updates):

            return HTTPNotFound(req=req)


            return None



**** CubicPower OpenStack Study ****

    def DELETE(self, req):

        """Handle HTTP DELETE request."""

        drive, part, account, container, obj = split_and_validate_path(

            req, 4, 5, True)

        if 'x-timestamp' not in req.headers or \

                not check_float(req.headers['x-timestamp']):

            return HTTPBadRequest(body='Missing timestamp', request=req,


        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)

        broker = self._get_container_broker(drive, part, account, container)

        if account.startswith(self.auto_create_account_prefix) and obj and \

                not os.path.exists(broker.db_file):



                    req.headers.get('x-timestamp') or time.time()))

            except DatabaseAlreadyExists:


        if not os.path.exists(broker.db_file):

            return HTTPNotFound()

        if obj:     # delete object

            broker.delete_object(obj, req.headers.get('x-timestamp'))

            return HTTPNoContent(request=req)


            # delete container

            if not broker.empty():

                return HTTPConflict(request=req)

            existed = float(broker.get_info()['put_timestamp']) and \

                not broker.is_deleted()


            if not broker.is_deleted():

                return HTTPConflict(request=req)

            resp = self.account_update(req, account, container, broker)

            if resp:

                return resp

            if existed:

                return HTTPNoContent(request=req)

            return HTTPNotFound()

**** CubicPower OpenStack Study ****

    def _update_or_create(self, req, broker, timestamp):

        if not os.path.exists(broker.db_file):



            except DatabaseAlreadyExists:



                return True  # created

        created = broker.is_deleted()


        if broker.is_deleted():

            raise HTTPConflict(request=req)

        return created



**** CubicPower OpenStack Study ****

    def PUT(self, req):

        """Handle HTTP PUT request."""

        drive, part, account, container, obj = split_and_validate_path(

            req, 4, 5, True)

        if 'x-timestamp' not in req.headers or \

                not check_float(req.headers['x-timestamp']):

            return HTTPBadRequest(body='Missing timestamp', request=req,


        if 'x-container-sync-to' in req.headers:

            err, sync_to, realm, realm_key = validate_sync_to(

                req.headers['x-container-sync-to'], self.allowed_sync_hosts,


            if err:

                return HTTPBadRequest(err)

        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)

        timestamp = normalize_timestamp(req.headers['x-timestamp'])

        broker = self._get_container_broker(drive, part, account, container)

        if obj:     # put container object

            if account.startswith(self.auto_create_account_prefix) and \

                    not os.path.exists(broker.db_file):



                except DatabaseAlreadyExists:


            if not os.path.exists(broker.db_file):

                return HTTPNotFound()

            broker.put_object(obj, timestamp, int(req.headers['x-size']),



            return HTTPCreated(request=req)

        else:   # put container

            created = self._update_or_create(req, broker, timestamp)

            metadata = {}


                (key, (value, timestamp))

                for key, value in req.headers.iteritems()

                if key.lower() in self.save_headers or

                is_sys_or_user_meta('container', key))

            if metadata:

                if 'X-Container-Sync-To' in metadata:

                    if 'X-Container-Sync-To' not in broker.metadata or \

                            metadata['X-Container-Sync-To'][0] != \


                        broker.set_x_container_sync_points(-1, -1)


            resp = self.account_update(req, account, container, broker)

            if resp:

                return resp

            if created:

                return HTTPCreated(request=req)


                return HTTPAccepted(request=req)



**** CubicPower OpenStack Study ****

    def HEAD(self, req):

        """Handle HTTP HEAD request."""

        drive, part, account, container, obj = split_and_validate_path(

            req, 4, 5, True)

        out_content_type = get_listing_content_type(req)

        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)

        broker = self._get_container_broker(drive, part, account, container,



        if broker.is_deleted():

            return HTTPNotFound(request=req)

        info = broker.get_info()

        headers = {

            'X-Container-Object-Count': info['object_count'],

            'X-Container-Bytes-Used': info['bytes_used'],

            'X-Timestamp': info['created_at'],

            'X-PUT-Timestamp': info['put_timestamp'],



            (key, value)

            for key, (value, timestamp) in broker.metadata.iteritems()

            if value != '' and (key.lower() in self.save_headers or

                                is_sys_or_user_meta('container', key)))

        headers['Content-Type'] = out_content_type

        return HTTPNoContent(request=req, headers=headers, charset='utf-8')

**** CubicPower OpenStack Study ****

    def update_data_record(self, record):


        Perform any mutations to container listing records that are common to

        all serialization formats, and returns it as a dict.

        Converts created time to iso timestamp.

        Replaces size with 'swift_bytes' content type parameter.

        :params record: object entry record

        :returns: modified record


        (name, created, size, content_type, etag) = record

        if content_type is None:

            return {'subdir': name}

        response = {'bytes': size, 'hash': etag, 'name': name,

                    'content_type': content_type}

        last_modified = datetime.utcfromtimestamp(float(created)).isoformat()

        # python isoformat() doesn't include msecs when zero

        if len(last_modified) < len("1970-01-01T00:00:00.000000"):

            last_modified += ".000000"

        response['last_modified'] = last_modified

        override_bytes_from_content_type(response, logger=self.logger)

        return response



**** CubicPower OpenStack Study ****

    def GET(self, req):

        """Handle HTTP GET request."""

        drive, part, account, container, obj = split_and_validate_path(

            req, 4, 5, True)

        path = get_param(req, 'path')

        prefix = get_param(req, 'prefix')

        delimiter = get_param(req, 'delimiter')

        if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):

            # delimiters can be made more flexible later

            return HTTPPreconditionFailed(body='Bad delimiter')

        marker = get_param(req, 'marker', '')

        end_marker = get_param(req, 'end_marker')


        given_limit = get_param(req, 'limit')

        if given_limit and given_limit.isdigit():

            limit = int(given_limit)

            if limit > CONTAINER_LISTING_LIMIT:

                return HTTPPreconditionFailed(


                    body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)

        out_content_type = get_listing_content_type(req)

        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)

        broker = self._get_container_broker(drive, part, account, container,



        if broker.is_deleted():

            return HTTPNotFound(request=req)

        info = broker.get_info()

        resp_headers = {

            'X-Container-Object-Count': info['object_count'],

            'X-Container-Bytes-Used': info['bytes_used'],

            'X-Timestamp': info['created_at'],

            'X-PUT-Timestamp': info['put_timestamp'],


        for key, (value, timestamp) in broker.metadata.iteritems():

            if value and (key.lower() in self.save_headers or

                          is_sys_or_user_meta('container', key)):

                resp_headers[key] = value

        ret = Response(request=req, headers=resp_headers,

                       content_type=out_content_type, charset='utf-8')

        container_list = broker.list_objects_iter(limit, marker, end_marker,

                                                  prefix, delimiter, path)

        if out_content_type == 'application/json':

            ret.body = json.dumps([self.update_data_record(record)

                                   for record in container_list])

        elif out_content_type.endswith('/xml'):

            doc = Element('container', name=container.decode('utf-8'))

            for obj in container_list:

                record = self.update_data_record(obj)

                if 'subdir' in record:

                    name = record['subdir'].decode('utf-8')

                    sub = SubElement(doc, 'subdir', name=name)

                    SubElement(sub, 'name').text = name


                    obj_element = SubElement(doc, 'object')

                    for field in ["name", "hash", "bytes", "content_type",


                        SubElement(obj_element, field).text = str(


                    for field in sorted(record):

                        SubElement(obj_element, field).text = str(


            ret.body = tostring(doc, encoding='UTF-8').replace(


                '', 1)


            if not container_list:

                return HTTPNoContent(request=req, headers=resp_headers)

            ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'

        return ret




**** CubicPower OpenStack Study ****

    def REPLICATE(self, req):


        Handle HTTP REPLICATE request (json-encoded RPC calls for replication.)


        post_args = split_and_validate_path(req, 3)

        drive, partition, hash = post_args

        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)


            args = json.load(req.environ['wsgi.input'])

        except ValueError as err:

            return HTTPBadRequest(body=str(err), content_type='text/plain')

        ret = self.replicator_rpc.dispatch(post_args, args)

        ret.request = req

        return ret



**** CubicPower OpenStack Study ****

    def POST(self, req):

        """Handle HTTP POST request."""

        drive, part, account, container = split_and_validate_path(req, 4)

        if 'x-timestamp' not in req.headers or \

                not check_float(req.headers['x-timestamp']):

            return HTTPBadRequest(body='Missing or bad timestamp',

                                  request=req, content_type='text/plain')

        if 'x-container-sync-to' in req.headers:

            err, sync_to, realm, realm_key = validate_sync_to(

                req.headers['x-container-sync-to'], self.allowed_sync_hosts,


            if err:

                return HTTPBadRequest(err)

        if self.mount_check and not check_mount(self.root, drive):

            return HTTPInsufficientStorage(drive=drive, request=req)

        broker = self._get_container_broker(drive, part, account, container)

        if broker.is_deleted():

            return HTTPNotFound(request=req)

        timestamp = normalize_timestamp(req.headers['x-timestamp'])

        metadata = {}


            (key, (value, timestamp)) for key, value in req.headers.iteritems()

            if key.lower() in self.save_headers or

            is_sys_or_user_meta('container', key))

        if metadata:

            if 'X-Container-Sync-To' in metadata:

                if 'X-Container-Sync-To' not in broker.metadata or \

                        metadata['X-Container-Sync-To'][0] != \


                    broker.set_x_container_sync_points(-1, -1)


        return HTTPNoContent(request=req)

**** CubicPower OpenStack Study ****

    def __call__(self, env, start_response):

        start_time = time.time()

        req = Request(env)

        self.logger.txn_id = req.headers.get('x-trans-id', None)

        if not check_utf8(req.path_info):

            res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')



                # disallow methods which have not been marked 'public'


                    method = getattr(self, req.method)

                    getattr(method, 'publicly_accessible')

                    replication_method = getattr(method, 'replication', False)

                    if (self.replication_server is not None and

                            self.replication_server != replication_method):

                        raise AttributeError('Not allowed method.')

                except AttributeError:

                    res = HTTPMethodNotAllowed()


                    res = method(req)

            except HTTPException as error_response:

                res = error_response

            except (Exception, Timeout):


                    'ERROR __call__ error with %(method)s %(path)s '),

                    {'method': req.method, 'path': req.path})

                res = HTTPInternalServerError(body=traceback.format_exc())

        trans_time = '%.4f' % (time.time() - start_time)

        if self.log_requests:

            log_message = '%s - - [%s] "%s %s" %s %s "%s" "%s" "%s" %s' % (


                time.strftime('%d/%b/%Y:%H:%M:%S +0000',


                req.method, req.path,

                res.status.split()[0], res.content_length or '-',

                req.headers.get('x-trans-id', '-'),

                req.referer or '-', req.user_agent or '-',


            if req.method.upper() == 'REPLICATE':




        return res(env, start_response)

def app_factory(global_conf, **local_conf):

    """paste.deploy app factory for creating WSGI container server apps"""

    conf = global_conf.copy()


    return ContainerController(conf)

**** CubicPower OpenStack Study ****

def app_factory(global_conf, **local_conf):

    """paste.deploy app factory for creating WSGI container server apps"""

    conf = global_conf.copy()


    return ContainerController(conf)