**** CubicPower OpenStack Study ****
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid
from swift import gettext_ as _
from time import ctime, time
from random import choice, random, shuffle
from struct import unpack_from
from eventlet import sleep, Timeout
import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.direct_client import direct_get_object
from swift.common.internal_client import delete_object, put_object
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.utils import audit_location_generator, get_logger, \
hash_path, config_true_value, validate_sync_to, whataremyips, \
FileLikeIter, urlparse, quote
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
**** CubicPower OpenStack Study ****
class ContainerSync(Daemon):
"""
Daemon to sync syncable containers.
This is done by scanning the local devices for container databases and
checking for x-container-sync-to and x-container-sync-key metadata values.
If they exist, newer rows since the last sync will trigger PUTs or DELETEs
to the other container.
.. note::
Container sync will sync object POSTs only if the proxy server is set
to use "object_post_as_copy = true" which is the
**** CubicPower OpenStack Study ****
def __init__(self, conf, container_ring=None, object_ring=None):
#: The dict of configuration values from the [container-sync] section
#: of the container-server.conf.
self.conf = conf
#: Logger to use for container-sync log lines.
self.logger = get_logger(conf, log_route='container-sync')
#: Path to the local device mount points.
self.devices = conf.get('devices', '/srv/node')
#: Indicates whether mount points should be verified as actual mount
#: points (normally true, false for tests and SAIO).
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
#: Minimum time between full scans. This is to keep the daemon from
#: running wild on near empty systems.
self.interval = int(conf.get('interval', 300))
#: Maximum amount of time to spend syncing a container before moving on
#: to the next one. If a conatiner sync hasn't finished in this time,
#: it'll just be resumed next scan.
self.container_time = int(conf.get('container_time', 60))
#: ContainerSyncCluster instance for validating sync-to values.
self.realms_conf = ContainerSyncRealms(
os.path.join(
conf.get('swift_dir', '/etc/swift'),
'container-sync-realms.conf'),
self.logger)
#: The list of hosts we're allowed to send syncs to. This can be
#: overridden by data in self.realms_conf
self.allowed_sync_hosts = [
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.http_proxies = [
a.strip()
for a in conf.get('sync_proxy', '').split(',')
if a.strip()]
#: Number of containers with sync turned on that were successfully
#: synced.
self.container_syncs = 0
#: Number of successful DELETEs triggered.
self.container_deletes = 0
#: Number of successful PUTs triggered.
self.container_puts = 0
#: Number of containers that didn't have sync turned on.
self.container_skips = 0
#: Number of containers that had a failure of some type.
self.container_failures = 0
#: Time of last stats report.
self.reported = time()
swift_dir = conf.get('swift_dir', '/etc/swift')
#: swift.common.ring.Ring for locating containers.
self.container_ring = container_ring or Ring(swift_dir,
ring_name='container')
#: swift.common.ring.Ring for locating objects.
self.object_ring = object_ring or Ring(swift_dir, ring_name='object')
self._myips = whataremyips()
self._myport = int(conf.get('bind_port', 6001))
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
**** CubicPower OpenStack Study ****
def run_forever(self, *args, **kwargs):
"""
Runs container sync scans until stopped.
"""
sleep(random() * self.interval)
while True:
begin = time()
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time() - self.reported >= 3600: # once an hour
self.report()
elapsed = time() - begin
if elapsed < self.interval:
sleep(self.interval - elapsed)
**** CubicPower OpenStack Study ****
def run_once(self, *args, **kwargs):
"""
Runs a single container sync scan.
"""
self.logger.info(_('Begin container sync "once" mode'))
begin = time()
all_locs = audit_location_generator(self.devices, DATADIR, '.db',
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time() - self.reported >= 3600: # once an hour
self.report()
self.report()
elapsed = time() - begin
self.logger.info(
_('Container sync "once" mode completed: %.02fs'), elapsed)
**** CubicPower OpenStack Study ****
def report(self):
"""
Writes a report of the stats to the logger and resets the stats for the
next report.
"""
self.logger.info(
_('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s '
'puts], %(skip)s skipped, %(fail)s failed'),
{'time': ctime(self.reported),
'sync': self.container_syncs,
'delete': self.container_deletes,
'put': self.container_puts,
'skip': self.container_skips,
'fail': self.container_failures})
self.reported = time()
self.container_syncs = 0
self.container_deletes = 0
self.container_puts = 0
self.container_skips = 0
self.container_failures = 0
**** CubicPower OpenStack Study ****
def container_sync(self, path):
"""
Checks the given path for a container database, determines if syncing
is turned on for that database and, if so, sends any updates to the
other container.
:param path: the path to a container db
"""
broker = None
try:
broker = ContainerBroker(path)
info = broker.get_info()
x, nodes = self.container_ring.get_nodes(info['account'],
info['container'])
for ordinal, node in enumerate(nodes):
if node['ip'] in self._myips and node['port'] == self._myport:
break
else:
return
if not broker.is_deleted():
sync_to = None
user_key = None
sync_point1 = info['x_container_sync_point1']
sync_point2 = info['x_container_sync_point2']
for key, (value, timestamp) in broker.metadata.iteritems():
if key.lower() == 'x-container-sync-to':
sync_to = value
elif key.lower() == 'x-container-sync-key':
user_key = value
if not sync_to or not user_key:
self.container_skips += 1
self.logger.increment('skips')
return
err, sync_to, realm, realm_key = validate_sync_to(
sync_to, self.allowed_sync_hosts, self.realms_conf)
if err:
self.logger.info(
_('ERROR %(db_file)s: %(validate_sync_to_err)s'),
{'db_file': str(broker),
'validate_sync_to_err': err})
self.container_failures += 1
self.logger.increment('failures')
return
stop_at = time() + self.container_time
next_sync_point = None
while time() < stop_at and sync_point2 < sync_point1:
rows = broker.get_items_since(sync_point2, 1)
if not rows:
break
row = rows[0]
if row['ROWID'] > sync_point1:
break
key = hash_path(info['account'], info['container'],
row['name'], raw_digest=True)
# This node will only initially sync out one third of the
# objects (if 3 replicas, 1/4 if 4, etc.) and will skip
# problematic rows as needed in case of faults.
# This section will attempt to sync previously skipped
# rows in case the previous attempts by any of the nodes
# didn't succeed.
if not self.container_sync_row(
row, sync_to, user_key, broker, info, realm,
realm_key):
if not next_sync_point:
next_sync_point = sync_point2
sync_point2 = row['ROWID']
broker.set_x_container_sync_points(None, sync_point2)
if next_sync_point:
broker.set_x_container_sync_points(None, next_sync_point)
while time() < stop_at:
rows = broker.get_items_since(sync_point1, 1)
if not rows:
break
row = rows[0]
key = hash_path(info['account'], info['container'],
row['name'], raw_digest=True)
# This node will only initially sync out one third of the
# objects (if 3 replicas, 1/4 if 4, etc.). It'll come back
# around to the section above and attempt to sync
# previously skipped rows in case the other nodes didn't
# succeed or in case it failed to do so the first time.
if unpack_from('>I', key)[0] % \
len(nodes) == ordinal:
self.container_sync_row(
row, sync_to, user_key, broker, info, realm,
realm_key)
sync_point1 = row['ROWID']
broker.set_x_container_sync_points(sync_point1, None)
self.container_syncs += 1
self.logger.increment('syncs')
except (Exception, Timeout) as err:
self.container_failures += 1
self.logger.increment('failures')
self.logger.exception(_('ERROR Syncing %s'),
broker if broker else path)
**** CubicPower OpenStack Study ****
def container_sync_row(self, row, sync_to, user_key, broker, info,
realm, realm_key):
"""
Sends the update the row indicates to the sync_to container.
:param row: The updated row in the local database triggering the sync
update.
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param broker: The local container database broker.
:param info: The get_info result from the local container database
broker.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:returns: True on success
"""
try:
start_time = time()
if row['deleted']:
try:
headers = {'x-timestamp': row['created_at']}
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(
row['name'])
sig = self.realms_conf.get_sig(
'DELETE', path, headers['x-timestamp'], nonce,
realm_key, user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy())
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
self.container_deletes += 1
self.logger.increment('deletes')
self.logger.timing_since('deletes.timing', start_time)
else:
part, nodes = self.object_ring.get_nodes(
info['account'], info['container'],
row['name'])
shuffle(nodes)
exc = None
looking_for_timestamp = float(row['created_at'])
timestamp = -1
headers = body = None
for node in nodes:
try:
these_headers, this_body = direct_get_object(
node, part, info['account'], info['container'],
row['name'], resp_chunk_size=65536)
this_timestamp = float(these_headers['x-timestamp'])
if this_timestamp > timestamp:
timestamp = this_timestamp
headers = these_headers
body = this_body
except ClientException as err:
# If any errors are not 404, make sure we report the
# non-404 one. We don't want to mistakenly assume the
# object no longer exists just because one says so and
# the others errored for some other reason.
if not exc or exc.http_status == HTTP_NOT_FOUND:
exc = err
except (Exception, Timeout) as err:
exc = err
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(
_('Unknown exception trying to GET: %(node)r '
'%(account)r %(container)r %(object)r'),
{'node': node, 'part': part,
'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
if key in headers:
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
headers['x-timestamp'] = row['created_at']
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(row['name'])
sig = self.realms_conf.get_sig(
'PUT', path, headers['x-timestamp'], nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
put_object(sync_to, name=row['name'], headers=headers,
contents=FileLikeIter(body),
proxy=self.select_http_proxy())
self.container_puts += 1
self.logger.increment('puts')
self.logger.timing_since('puts.timing', start_time)
except ClientException as err:
if err.http_status == HTTP_UNAUTHORIZED:
self.logger.info(
_('Unauth %(sync_from)r => %(sync_to)r'),
{'sync_from': '%s/%s' %
(quote(info['account']), quote(info['container'])),
'sync_to': sync_to})
elif err.http_status == HTTP_NOT_FOUND:
self.logger.info(
_('Not found %(sync_from)r => %(sync_to)r \
- object %(obj_name)r'),
{'sync_from': '%s/%s' %
(quote(info['account']), quote(info['container'])),
'sync_to': sync_to, 'obj_name': row['name']})
else:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': str(broker), 'row': row})
self.container_failures += 1
self.logger.increment('failures')
return False
except (Exception, Timeout) as err:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': str(broker), 'row': row})
self.container_failures += 1
self.logger.increment('failures')
return False
return True
**** CubicPower OpenStack Study ****
def select_http_proxy(self):
return choice(self.http_proxies) if self.http_proxies else None