¡@

Home 

OpenStack Study: s3server.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2010 United States Government as represented by the

# Administrator of the National Aeronautics and Space Administration.

# Copyright 2010 OpenStack Foundation

# Copyright 2009 Facebook

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""Implementation of an S3-like storage server based on local files.

Useful to test features that will eventually run on S3, or if you want to

run something locally that was once running on S3.

We don't support all the features of S3, but it does work with the

standard S3 client for the most basic semantics. To use the standard

S3 client with this module::

c = S3.AWSAuthConnection("", "", server="localhost", port=8888,

is_secure=False)

c.create_bucket("mybucket")

c.put("mybucket", "mykey", "a value")

print c.get("mybucket", "mykey").body

"""

import bisect

import datetime

import os

import os.path

import urllib

from oslo.config import cfg

import routes

import six

import webob

from nova.openstack.common import fileutils

from nova import paths

from nova import utils

from nova import wsgi

s3_opts = [

cfg.StrOpt('buckets_path',

default=paths.state_path_def('buckets'),

help='Path to S3 buckets'),

cfg.StrOpt('s3_listen',

default="0.0.0.0",

help='IP address for S3 API to listen'),

cfg.IntOpt('s3_listen_port',

default=3333,

help='Port for S3 API to listen'),

]

CONF = cfg.CONF

CONF.register_opts(s3_opts)

**** CubicPower OpenStack Study ****

def get_wsgi_server():

    return wsgi.Server("S3 Objectstore",

                       S3Application(CONF.buckets_path),

                       port=CONF.s3_listen_port,

                       host=CONF.s3_listen)

**** CubicPower OpenStack Study ****

class S3Application(wsgi.Router):

"""Implementation of an S3-like storage server based on local files.

If bucket depth is given, we break files up into multiple directories

to prevent hitting file system limits for number of files in each

directories. 1 means one level of directories, 2 means 2, etc.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, root_directory, bucket_depth=0, mapper=None):

        if mapper is None:

            mapper = routes.Mapper()

        mapper.connect('/',

                controller=lambda *a, **kw: RootHandler(self)(*a, **kw))

        mapper.connect('/{bucket}/{object_name}',

                controller=lambda *a, **kw: ObjectHandler(self)(*a, **kw))

        mapper.connect('/{bucket_name}/',

                controller=lambda *a, **kw: BucketHandler(self)(*a, **kw))

        self.directory = os.path.abspath(root_directory)

        fileutils.ensure_tree(self.directory)

        self.bucket_depth = bucket_depth

        super(S3Application, self).__init__(mapper)

**** CubicPower OpenStack Study ****

class BaseRequestHandler(object):

"""Base class emulating Tornado's web framework pattern in WSGI.

This is a direct port of Tornado's implementation, so some key decisions

about how the code interacts have already been chosen.

The two most common ways of designing web frameworks can be

classified as async object-oriented and sync functional.

Tornado's is on the OO side because a response is built up in and using

the shared state of an object and one of the object's methods will

eventually trigger the "finishing" of the response asynchronously.

Most WSGI stuff is in the functional side, we pass a request object to

every call down a chain and the eventual return value will be a response.

Part of the function of the routing code in S3Application as well as the

code in BaseRequestHandler's __call__ method is to merge those two styles

together enough that the Tornado code can work without extensive

modifications.

To do that it needs to give the Tornado-style code clean objects that it

can modify the state of for each request that is processed, so we use a

very simple factory lambda to create new state for each request, that's

the stuff in the router, and when we let the Tornado code modify that

object to handle the request, then we return the response it generated.

This wouldn't work the same if Tornado was being more async'y and doing

other callbacks throughout the process, but since Tornado is being

relatively simple here we can be satisfied that the response will be

complete by the end of the get/post method.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, application):

        self.application = application

    @webob.dec.wsgify

**** CubicPower OpenStack Study ****

    def __call__(self, request):

        method = request.method.lower()

        f = getattr(self, method, self.invalid)

        self.request = request

        self.response = webob.Response()

        params = request.environ['wsgiorg.routing_args'][1]

        del params['controller']

        f(**params)

        return self.response

**** CubicPower OpenStack Study ****

    def get_argument(self, arg, default):

        return self.request.params.get(arg, default)

**** CubicPower OpenStack Study ****

    def set_header(self, header, value):

        self.response.headers[header] = value

**** CubicPower OpenStack Study ****

    def set_status(self, status_code):

        self.response.status = status_code

**** CubicPower OpenStack Study ****

    def set_404(self):

        self.render_xml({"Error": {

            "Code": "NoSuchKey",

            "Message": "The resource you requested does not exist"

            }})

        self.set_status(404)

**** CubicPower OpenStack Study ****

    def finish(self, body=''):

        self.response.body = utils.utf8(body)

**** CubicPower OpenStack Study ****

    def invalid(self, **kwargs):

        pass

**** CubicPower OpenStack Study ****

    def render_xml(self, value):

        assert isinstance(value, dict) and len(value) == 1

        self.set_header("Content-Type", "application/xml; charset=UTF-8")

        name = value.keys()[0]

        parts = []

        parts.append('<' + utils.utf8(name) +

                     ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">')

        self._render_parts(value.values()[0], parts)

        parts.append('')

        self.finish('\n' +

                    ''.join(parts))

**** CubicPower OpenStack Study ****

    def _render_parts(self, value, parts=None):

        if not parts:

            parts = []

        if isinstance(value, six.string_types):

            parts.append(utils.xhtml_escape(value))

        elif isinstance(value, int) or isinstance(value, long):

            parts.append(str(value))

        elif isinstance(value, datetime.datetime):

            parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z"))

        elif isinstance(value, dict):

            for name, subvalue in value.iteritems():

                if not isinstance(subvalue, list):

                    subvalue = [subvalue]

                for subsubvalue in subvalue:

                    parts.append('<' + utils.utf8(name) + '>')

                    self._render_parts(subsubvalue, parts)

                    parts.append('')

        else:

            raise Exception("Unknown S3 value type %r", value)

**** CubicPower OpenStack Study ****

    def _object_path(self, bucket, object_name):

        if self.application.bucket_depth < 1:

            return os.path.abspath(os.path.join(

                self.application.directory, bucket, object_name))

        hash = utils.get_hash_str(object_name)

        path = os.path.abspath(os.path.join(

            self.application.directory, bucket))

        for i in range(self.application.bucket_depth):

            path = os.path.join(path, hash[:2 * (i + 1)])

        return os.path.join(path, object_name)

**** CubicPower OpenStack Study ****

class RootHandler(BaseRequestHandler):

**** CubicPower OpenStack Study ****

    def get(self):

        names = os.listdir(self.application.directory)

        buckets = []

        for name in names:

            path = os.path.join(self.application.directory, name)

            info = os.stat(path)

            buckets.append({

                "Name": name,

                "CreationDate": datetime.datetime.utcfromtimestamp(

                    info.st_ctime),

            })

        self.render_xml({"ListAllMyBucketsResult": {

            "Buckets": {"Bucket": buckets},

        }})

**** CubicPower OpenStack Study ****

class BucketHandler(BaseRequestHandler):

**** CubicPower OpenStack Study ****

    def get(self, bucket_name):

        prefix = self.get_argument("prefix", u"")

        marker = self.get_argument("marker", u"")

        max_keys = int(self.get_argument("max-keys", 50000))

        path = os.path.abspath(os.path.join(self.application.directory,

                                            bucket_name))

        terse = int(self.get_argument("terse", 0))

        if (not path.startswith(self.application.directory) or

                not os.path.isdir(path)):

            self.set_404()

            return

        object_names = []

        for root, dirs, files in os.walk(path):

            for file_name in files:

                object_names.append(os.path.join(root, file_name))

        skip = len(path) + 1

        for i in range(self.application.bucket_depth):

            skip += 2 * (i + 1) + 1

        object_names = [n[skip:] for n in object_names]

        object_names.sort()

        contents = []

        start_pos = 0

        if marker:

            start_pos = bisect.bisect_right(object_names, marker, start_pos)

        if prefix:

            start_pos = bisect.bisect_left(object_names, prefix, start_pos)

        truncated = False

        for object_name in object_names[start_pos:]:

            if not object_name.startswith(prefix):

                break

            if len(contents) >= max_keys:

                truncated = True

                break

            object_path = self._object_path(bucket_name, object_name)

            c = {"Key": object_name}

            if not terse:

                info = os.stat(object_path)

                c.update({

                    "LastModified": datetime.datetime.utcfromtimestamp(

                        info.st_mtime),

                    "Size": info.st_size,

                })

            contents.append(c)

            marker = object_name

        self.render_xml({"ListBucketResult": {

            "Name": bucket_name,

            "Prefix": prefix,

            "Marker": marker,

            "MaxKeys": max_keys,

            "IsTruncated": truncated,

            "Contents": contents,

        }})

**** CubicPower OpenStack Study ****

    def put(self, bucket_name):

        path = os.path.abspath(os.path.join(

            self.application.directory, bucket_name))

        if (not path.startswith(self.application.directory) or

                os.path.exists(path)):

            self.set_status(403)

            return

        fileutils.ensure_tree(path)

        self.finish()

**** CubicPower OpenStack Study ****

    def delete(self, bucket_name):

        path = os.path.abspath(os.path.join(

            self.application.directory, bucket_name))

        if (not path.startswith(self.application.directory) or

                not os.path.isdir(path)):

            self.set_404()

            return

        if len(os.listdir(path)) > 0:

            self.set_status(403)

            return

        os.rmdir(path)

        self.set_status(204)

        self.finish()

**** CubicPower OpenStack Study ****

    def head(self, bucket_name):

        path = os.path.abspath(os.path.join(self.application.directory,

                                            bucket_name))

        if (not path.startswith(self.application.directory) or

                not os.path.isdir(path)):

            self.set_404()

            return

        self.set_status(200)

        self.finish()

**** CubicPower OpenStack Study ****

class ObjectHandler(BaseRequestHandler):

**** CubicPower OpenStack Study ****

    def get(self, bucket, object_name):

        object_name = urllib.unquote(object_name)

        path = self._object_path(bucket, object_name)

        if (not path.startswith(self.application.directory) or

                not os.path.isfile(path)):

            self.set_404()

            return

        info = os.stat(path)

        self.set_header("Content-Type", "application/unknown")

        self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(

            info.st_mtime))

        object_file = open(path, "r")

        try:

            self.finish(object_file.read())

        finally:

            object_file.close()

**** CubicPower OpenStack Study ****

    def put(self, bucket, object_name):

        object_name = urllib.unquote(object_name)

        bucket_dir = os.path.abspath(os.path.join(

            self.application.directory, bucket))

        if (not bucket_dir.startswith(self.application.directory) or

                not os.path.isdir(bucket_dir)):

            self.set_404()

            return

        path = self._object_path(bucket, object_name)

        if not path.startswith(bucket_dir) or os.path.isdir(path):

            self.set_status(403)

            return

        directory = os.path.dirname(path)

        fileutils.ensure_tree(directory)

        object_file = open(path, "w")

        object_file.write(self.request.body)

        object_file.close()

        self.set_header('ETag',

                        '"%s"' % utils.get_hash_str(self.request.body))

        self.finish()

**** CubicPower OpenStack Study ****

    def delete(self, bucket, object_name):

        object_name = urllib.unquote(object_name)

        path = self._object_path(bucket, object_name)

        if (not path.startswith(self.application.directory) or

                not os.path.isfile(path)):

            self.set_404()

            return

        os.unlink(path)

        self.set_status(204)

        self.finish()