¡@

Home 

OpenStack Study: sync.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2010-2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or

# implied.

# See the License for the specific language governing permissions and

# limitations under the License.

import os

import uuid

from swift import gettext_ as _

from time import ctime, time

from random import choice, random, shuffle

from struct import unpack_from

from eventlet import sleep, Timeout

import swift.common.db

from swift.container.backend import ContainerBroker, DATADIR

from swift.common.container_sync_realms import ContainerSyncRealms

from swift.common.direct_client import direct_get_object

from swift.common.internal_client import delete_object, put_object

from swift.common.exceptions import ClientException

from swift.common.ring import Ring

from swift.common.utils import audit_location_generator, get_logger, \

hash_path, config_true_value, validate_sync_to, whataremyips, \

FileLikeIter, urlparse, quote

from swift.common.daemon import Daemon

from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND

**** CubicPower OpenStack Study ****

class ContainerSync(Daemon):

"""

Daemon to sync syncable containers.

This is done by scanning the local devices for container databases and

checking for x-container-sync-to and x-container-sync-key metadata values.

If they exist, newer rows since the last sync will trigger PUTs or DELETEs

to the other container.

.. note::

Container sync will sync object POSTs only if the proxy server is set

to use "object_post_as_copy = true" which is the

**** CubicPower OpenStack Study ****

    def __init__(self, conf, container_ring=None, object_ring=None):

        #: The dict of configuration values from the [container-sync] section

        #: of the container-server.conf.

        self.conf = conf

        #: Logger to use for container-sync log lines.

        self.logger = get_logger(conf, log_route='container-sync')

        #: Path to the local device mount points.

        self.devices = conf.get('devices', '/srv/node')

        #: Indicates whether mount points should be verified as actual mount

        #: points (normally true, false for tests and SAIO).

        self.mount_check = config_true_value(conf.get('mount_check', 'true'))

        #: Minimum time between full scans. This is to keep the daemon from

        #: running wild on near empty systems.

        self.interval = int(conf.get('interval', 300))

        #: Maximum amount of time to spend syncing a container before moving on

        #: to the next one. If a conatiner sync hasn't finished in this time,

        #: it'll just be resumed next scan.

        self.container_time = int(conf.get('container_time', 60))

        #: ContainerSyncCluster instance for validating sync-to values.

        self.realms_conf = ContainerSyncRealms(

            os.path.join(

                conf.get('swift_dir', '/etc/swift'),

                'container-sync-realms.conf'),

            self.logger)

        #: The list of hosts we're allowed to send syncs to. This can be

        #: overridden by data in self.realms_conf

        self.allowed_sync_hosts = [

            h.strip()

            for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')

            if h.strip()]

        self.http_proxies = [

            a.strip()

            for a in conf.get('sync_proxy', '').split(',')

            if a.strip()]

        #: Number of containers with sync turned on that were successfully

        #: synced.

        self.container_syncs = 0

        #: Number of successful DELETEs triggered.

        self.container_deletes = 0

        #: Number of successful PUTs triggered.

        self.container_puts = 0

        #: Number of containers that didn't have sync turned on.

        self.container_skips = 0

        #: Number of containers that had a failure of some type.

        self.container_failures = 0

        #: Time of last stats report.

        self.reported = time()

        swift_dir = conf.get('swift_dir', '/etc/swift')

        #: swift.common.ring.Ring for locating containers.

        self.container_ring = container_ring or Ring(swift_dir,

                                                     ring_name='container')

        #: swift.common.ring.Ring for locating objects.

        self.object_ring = object_ring or Ring(swift_dir, ring_name='object')

        self._myips = whataremyips()

        self._myport = int(conf.get('bind_port', 6001))

        swift.common.db.DB_PREALLOCATION = \

            config_true_value(conf.get('db_preallocation', 'f'))

**** CubicPower OpenStack Study ****

    def run_forever(self, *args, **kwargs):

        """

        Runs container sync scans until stopped.

        """

        sleep(random() * self.interval)

        while True:

            begin = time()

            all_locs = audit_location_generator(self.devices, DATADIR, '.db',

                                                mount_check=self.mount_check,

                                                logger=self.logger)

            for path, device, partition in all_locs:

                self.container_sync(path)

                if time() - self.reported >= 3600:  # once an hour

                    self.report()

            elapsed = time() - begin

            if elapsed < self.interval:

                sleep(self.interval - elapsed)

**** CubicPower OpenStack Study ****

    def run_once(self, *args, **kwargs):

        """

        Runs a single container sync scan.

        """

        self.logger.info(_('Begin container sync "once" mode'))

        begin = time()

        all_locs = audit_location_generator(self.devices, DATADIR, '.db',

                                            mount_check=self.mount_check,

                                            logger=self.logger)

        for path, device, partition in all_locs:

            self.container_sync(path)

            if time() - self.reported >= 3600:  # once an hour

                self.report()

        self.report()

        elapsed = time() - begin

        self.logger.info(

            _('Container sync "once" mode completed: %.02fs'), elapsed)

**** CubicPower OpenStack Study ****

    def report(self):

        """

        Writes a report of the stats to the logger and resets the stats for the

        next report.

        """

        self.logger.info(

            _('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s '

              'puts], %(skip)s skipped, %(fail)s failed'),

            {'time': ctime(self.reported),

             'sync': self.container_syncs,

             'delete': self.container_deletes,

             'put': self.container_puts,

             'skip': self.container_skips,

             'fail': self.container_failures})

        self.reported = time()

        self.container_syncs = 0

        self.container_deletes = 0

        self.container_puts = 0

        self.container_skips = 0

        self.container_failures = 0

**** CubicPower OpenStack Study ****

    def container_sync(self, path):

        """

        Checks the given path for a container database, determines if syncing

        is turned on for that database and, if so, sends any updates to the

        other container.

        :param path: the path to a container db

        """

        broker = None

        try:

            broker = ContainerBroker(path)

            info = broker.get_info()

            x, nodes = self.container_ring.get_nodes(info['account'],

                                                     info['container'])

            for ordinal, node in enumerate(nodes):

                if node['ip'] in self._myips and node['port'] == self._myport:

                    break

            else:

                return

            if not broker.is_deleted():

                sync_to = None

                user_key = None

                sync_point1 = info['x_container_sync_point1']

                sync_point2 = info['x_container_sync_point2']

                for key, (value, timestamp) in broker.metadata.iteritems():

                    if key.lower() == 'x-container-sync-to':

                        sync_to = value

                    elif key.lower() == 'x-container-sync-key':

                        user_key = value

                if not sync_to or not user_key:

                    self.container_skips += 1

                    self.logger.increment('skips')

                    return

                err, sync_to, realm, realm_key = validate_sync_to(

                    sync_to, self.allowed_sync_hosts, self.realms_conf)

                if err:

                    self.logger.info(

                        _('ERROR %(db_file)s: %(validate_sync_to_err)s'),

                        {'db_file': str(broker),

                         'validate_sync_to_err': err})

                    self.container_failures += 1

                    self.logger.increment('failures')

                    return

                stop_at = time() + self.container_time

                next_sync_point = None

                while time() < stop_at and sync_point2 < sync_point1:

                    rows = broker.get_items_since(sync_point2, 1)

                    if not rows:

                        break

                    row = rows[0]

                    if row['ROWID'] > sync_point1:

                        break

                    key = hash_path(info['account'], info['container'],

                                    row['name'], raw_digest=True)

                    # This node will only initially sync out one third of the

                    # objects (if 3 replicas, 1/4 if 4, etc.) and will skip

                    # problematic rows as needed in case of faults.

                    # This section will attempt to sync previously skipped

                    # rows in case the previous attempts by any of the nodes

                    # didn't succeed.

                    if not self.container_sync_row(

                            row, sync_to, user_key, broker, info, realm,

                            realm_key):

                        if not next_sync_point:

                            next_sync_point = sync_point2

                    sync_point2 = row['ROWID']

                    broker.set_x_container_sync_points(None, sync_point2)

                if next_sync_point:

                    broker.set_x_container_sync_points(None, next_sync_point)

                while time() < stop_at:

                    rows = broker.get_items_since(sync_point1, 1)

                    if not rows:

                        break

                    row = rows[0]

                    key = hash_path(info['account'], info['container'],

                                    row['name'], raw_digest=True)

                    # This node will only initially sync out one third of the

                    # objects (if 3 replicas, 1/4 if 4, etc.). It'll come back

                    # around to the section above and attempt to sync

                    # previously skipped rows in case the other nodes didn't

                    # succeed or in case it failed to do so the first time.

                    if unpack_from('>I', key)[0] % \

                            len(nodes) == ordinal:

                        self.container_sync_row(

                            row, sync_to, user_key, broker, info, realm,

                            realm_key)

                    sync_point1 = row['ROWID']

                    broker.set_x_container_sync_points(sync_point1, None)

                self.container_syncs += 1

                self.logger.increment('syncs')

        except (Exception, Timeout) as err:

            self.container_failures += 1

            self.logger.increment('failures')

            self.logger.exception(_('ERROR Syncing %s'),

                                  broker if broker else path)

**** CubicPower OpenStack Study ****

    def container_sync_row(self, row, sync_to, user_key, broker, info,

                           realm, realm_key):

        """

        Sends the update the row indicates to the sync_to container.

        :param row: The updated row in the local database triggering the sync

                    update.

        :param sync_to: The URL to the remote container.

        :param user_key: The X-Container-Sync-Key to use when sending requests

                         to the other container.

        :param broker: The local container database broker.

        :param info: The get_info result from the local container database

                     broker.

        :param realm: The realm from self.realms_conf, if there is one.

            If None, fallback to using the older allowed_sync_hosts

            way of syncing.

        :param realm_key: The realm key from self.realms_conf, if there

            is one. If None, fallback to using the older

            allowed_sync_hosts way of syncing.

        :returns: True on success

        """

        try:

            start_time = time()

            if row['deleted']:

                try:

                    headers = {'x-timestamp': row['created_at']}

                    if realm and realm_key:

                        nonce = uuid.uuid4().hex

                        path = urlparse(sync_to).path + '/' + quote(

                            row['name'])

                        sig = self.realms_conf.get_sig(

                            'DELETE', path, headers['x-timestamp'], nonce,

                            realm_key, user_key)

                        headers['x-container-sync-auth'] = '%s %s %s' % (

                            realm, nonce, sig)

                    else:

                        headers['x-container-sync-key'] = user_key

                    delete_object(sync_to, name=row['name'], headers=headers,

                                  proxy=self.select_http_proxy())

                except ClientException as err:

                    if err.http_status != HTTP_NOT_FOUND:

                        raise

                self.container_deletes += 1

                self.logger.increment('deletes')

                self.logger.timing_since('deletes.timing', start_time)

            else:

                part, nodes = self.object_ring.get_nodes(

                    info['account'], info['container'],

                    row['name'])

                shuffle(nodes)

                exc = None

                looking_for_timestamp = float(row['created_at'])

                timestamp = -1

                headers = body = None

                for node in nodes:

                    try:

                        these_headers, this_body = direct_get_object(

                            node, part, info['account'], info['container'],

                            row['name'], resp_chunk_size=65536)

                        this_timestamp = float(these_headers['x-timestamp'])

                        if this_timestamp > timestamp:

                            timestamp = this_timestamp

                            headers = these_headers

                            body = this_body

                    except ClientException as err:

                        # If any errors are not 404, make sure we report the

                        # non-404 one. We don't want to mistakenly assume the

                        # object no longer exists just because one says so and

                        # the others errored for some other reason.

                        if not exc or exc.http_status == HTTP_NOT_FOUND:

                            exc = err

                    except (Exception, Timeout) as err:

                        exc = err

                if timestamp < looking_for_timestamp:

                    if exc:

                        raise exc

                    raise Exception(

                        _('Unknown exception trying to GET: %(node)r '

                          '%(account)r %(container)r %(object)r'),

                        {'node': node, 'part': part,

                         'account': info['account'],

                         'container': info['container'],

                         'object': row['name']})

                for key in ('date', 'last-modified'):

                    if key in headers:

                        del headers[key]

                if 'etag' in headers:

                    headers['etag'] = headers['etag'].strip('"')

                headers['x-timestamp'] = row['created_at']

                if realm and realm_key:

                    nonce = uuid.uuid4().hex

                    path = urlparse(sync_to).path + '/' + quote(row['name'])

                    sig = self.realms_conf.get_sig(

                        'PUT', path, headers['x-timestamp'], nonce, realm_key,

                        user_key)

                    headers['x-container-sync-auth'] = '%s %s %s' % (

                        realm, nonce, sig)

                else:

                    headers['x-container-sync-key'] = user_key

                put_object(sync_to, name=row['name'], headers=headers,

                           contents=FileLikeIter(body),

                           proxy=self.select_http_proxy())

                self.container_puts += 1

                self.logger.increment('puts')

                self.logger.timing_since('puts.timing', start_time)

        except ClientException as err:

            if err.http_status == HTTP_UNAUTHORIZED:

                self.logger.info(

                    _('Unauth %(sync_from)r => %(sync_to)r'),

                    {'sync_from': '%s/%s' %

                        (quote(info['account']), quote(info['container'])),

                     'sync_to': sync_to})

            elif err.http_status == HTTP_NOT_FOUND:

                self.logger.info(

                    _('Not found %(sync_from)r => %(sync_to)r \

                      - object %(obj_name)r'),

                    {'sync_from': '%s/%s' %

                        (quote(info['account']), quote(info['container'])),

                     'sync_to': sync_to, 'obj_name': row['name']})

            else:

                self.logger.exception(

                    _('ERROR Syncing %(db_file)s %(row)s'),

                    {'db_file': str(broker), 'row': row})

            self.container_failures += 1

            self.logger.increment('failures')

            return False

        except (Exception, Timeout) as err:

            self.logger.exception(

                _('ERROR Syncing %(db_file)s %(row)s'),

                {'db_file': str(broker), 'row': row})

            self.container_failures += 1

            self.logger.increment('failures')

            return False

        return True

**** CubicPower OpenStack Study ****

    def select_http_proxy(self):

        return choice(self.http_proxies) if self.http_proxies else None