**** CubicPower OpenStack Study ****
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import calendar
import eventlet
import os
import time
from oslo.config import cfg
from glance.common import crypt
from glance.common import exception
from glance.common import utils
from glance import context
from glance.openstack.common import lockutils
import glance.openstack.common.log as logging
import glance.registry.client.v1.api as registry
LOG = logging.getLogger(__name__)
scrubber_opts = [
cfg.StrOpt('scrubber_datadir',
default='/var/lib/glance/scrubber',
help=_('Directory that the scrubber will use to track '
'information about what to delete. '
'Make sure this is set in glance-api.conf and '
'glance-scrubber.conf.')),
cfg.IntOpt('scrub_time', default=0,
help=_('The amount of time in seconds to delay before '
'performing a delete.')),
cfg.BoolOpt('cleanup_scrubber', default=False,
help=_('A boolean that determines if the scrubber should '
'clean up the files it uses for taking data. Only '
'one server in your deployment should be designated '
'the cleanup host.')),
cfg.IntOpt('cleanup_scrubber_time', default=86400,
help=_('Items must have a modified time that is older than '
'this value in order to be candidates for cleanup.'))
]
CONF = cfg.CONF
CONF.register_opts(scrubber_opts)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
**** CubicPower OpenStack Study ****
class ScrubQueue(object):
"""Image scrub queue base class.
The queue contains image's location which need to delete from backend.
"""
**** CubicPower OpenStack Study ****
def __init__(self):
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
@abc.abstractmethod
**** CubicPower OpenStack Study ****
def add_location(self, image_id, uri, user_context=None):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
:param user_context: The user's request context
"""
pass
@abc.abstractmethod
**** CubicPower OpenStack Study ****
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
**** CubicPower OpenStack Study ****
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
pass
@abc.abstractmethod
**** CubicPower OpenStack Study ****
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
pass
**** CubicPower OpenStack Study ****
class ScrubFileQueue(ScrubQueue):
"""File-based image scrub queue class."""
**** CubicPower OpenStack Study ****
def __init__(self):
super(ScrubFileQueue, self).__init__()
self.scrubber_datadir = CONF.scrubber_datadir
utils.safe_mkdirs(self.scrubber_datadir)
self.scrub_time = CONF.scrub_time
self.metadata_encryption_key = CONF.metadata_encryption_key
**** CubicPower OpenStack Study ****
def _read_queue_file(self, file_path):
"""Reading queue file to loading deleted location and timestamp out.
:param file_path: Queue file full path
:retval a list of image location timestamp tuple from queue file
"""
uris = []
delete_times = []
try:
with open(file_path, 'r') as f:
while True:
uri = f.readline().strip()
if uri:
uris.append(uri)
delete_times.append(int(f.readline().strip()))
else:
break
except Exception:
LOG.error(_("%s file can not be read.") % file_path)
return uris, delete_times
**** CubicPower OpenStack Study ****
def _update_queue_file(self, file_path, remove_record_idxs):
"""Updating queue file to remove such queue records.
:param file_path: Queue file full path
:param remove_record_idxs: A list of record index those want to remove
"""
try:
with open(file_path, 'r') as f:
lines = f.readlines()
# NOTE(zhiyan) we need bottom up removing to
# keep record index be valid.
remove_record_idxs.sort(reverse=True)
for record_idx in remove_record_idxs:
# Each record has two lines
line_no = (record_idx + 1) * 2 - 1
del lines[line_no:line_no + 2]
with open(file_path, 'w') as f:
f.write(''.join(lines))
os.chmod(file_path, 0o600)
except Exception:
LOG.error(_("%s file can not be wrote.") % file_path)
**** CubicPower OpenStack Study ****
def add_location(self, image_id, uri, user_context=None):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
:param user_context: The user's request context
"""
if user_context is not None:
registry_client = registry.get_registry_client(user_context)
else:
registry_client = self.registry
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
# NOTE(zhiyan): make sure scrubber does not cleanup
# 'pending_delete' images concurrently before the code
# get lock and reach here.
try:
image = registry_client.get_image(image_id)
if image['status'] == 'deleted':
return
except exception.NotFound as e:
LOG.error(_("Failed to find image to delete: "
"%(e)s"), {'e': e})
return
delete_time = time.time() + self.scrub_time
file_path = os.path.join(self.scrubber_datadir, str(image_id))
if self.metadata_encryption_key is not None:
uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
uri, 64)
if os.path.exists(file_path):
# Append the uri of location to the queue file
with open(file_path, 'a') as f:
f.write('\n')
f.write('\n'.join([uri, str(int(delete_time))]))
else:
# NOTE(zhiyan): Protect the file before we write any data.
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
with open(file_path, 'w') as f:
f.write('\n'.join([uri, str(int(delete_time))]))
os.utime(file_path, (delete_time, delete_time))
**** CubicPower OpenStack Study ****
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image image_id and location tuple from scrub queue
"""
if not os.path.exists(self.scrubber_datadir):
LOG.info(_("%s directory does not exist.") % self.scrubber_datadir)
return []
ret = []
for root, dirs, files in os.walk(self.scrubber_datadir):
for image_id in files:
if not utils.is_uuid_like(image_id):
continue
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
file_path = os.path.join(self.scrubber_datadir, image_id)
uris, delete_times = self._read_queue_file(file_path)
remove_record_idxs = []
skipped = False
for (record_idx, delete_time) in enumerate(delete_times):
if delete_time > time.time():
skipped = True
continue
else:
ret.append((image_id, uris[record_idx]))
remove_record_idxs.append(record_idx)
if remove:
if skipped:
# NOTE(zhiyan): remove location records from
# the queue file.
self._update_queue_file(file_path,
remove_record_idxs)
else:
utils.safe_remove(file_path)
return ret
**** CubicPower OpenStack Study ****
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
**** CubicPower OpenStack Study ****
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
**** CubicPower OpenStack Study ****
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
return os.path.exists(os.path.join(self.scrubber_datadir,
str(image_id)))
**** CubicPower OpenStack Study ****
class ScrubDBQueue(ScrubQueue):
"""Database-based image scrub queue class."""
**** CubicPower OpenStack Study ****
def __init__(self):
super(ScrubDBQueue, self).__init__()
self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
**** CubicPower OpenStack Study ****
def add_location(self, image_id, uri, user_context=None):
"""Adding image location to scrub queue.
:param image_id: The opaque image identifier
:param uri: The opaque image location uri
:param user_context: The user's request context
"""
raise NotImplementedError
**** CubicPower OpenStack Study ****
def _walk_all_locations(self, remove=False):
"""Returns a list of image id and location tuple from scrub queue.
:param remove: Whether remove location from queue or not after walk
:retval a list of image id and location tuple from scrub queue
"""
filters = {'deleted': True,
'is_public': 'none',
'status': 'pending_delete'}
ret = []
for image in self.registry.get_images_detailed(filters=filters):
deleted_at = image.get('deleted_at')
if not deleted_at:
continue
# NOTE: Strip off microseconds which may occur after the last '.,'
# Example: 2012-07-07T19:14:34.974216
date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
delete_time = calendar.timegm(time.strptime(date_str,
"%Y-%m-%dT%H:%M:%S"))
if delete_time + self.cleanup_scrubber_time > time.time():
continue
ret.extend([(image['id'], location['uri'])
for location in image['location_data']])
if remove:
self.registry.update_image(image['id'], {'status': 'deleted'})
return ret
**** CubicPower OpenStack Study ****
def get_all_locations(self):
"""Returns a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations()
**** CubicPower OpenStack Study ****
def pop_all_locations(self):
"""Pop out a list of image id and location tuple from scrub queue.
:retval a list of image id and location tuple from scrub queue
"""
return self._walk_all_locations(remove=True)
**** CubicPower OpenStack Study ****
def has_image(self, image_id):
"""Returns whether the queue contains an image or not.
:param image_id: The opaque image identifier
:retval a boolean value to inform including or not
"""
try:
image = self.registry.get_image(image_id)
return image['status'] == 'pending_delete'
except exception.NotFound:
return False
_file_queue = None
_db_queue = None
def get_scrub_queues():
global _file_queue, _db_queue
if not _file_queue:
_file_queue = ScrubFileQueue()
if not _db_queue:
_db_queue = ScrubDBQueue()
return (_file_queue, _db_queue)
**** CubicPower OpenStack Study ****
def get_scrub_queues():
global _file_queue, _db_queue
if not _file_queue:
_file_queue = ScrubFileQueue()
if not _db_queue:
_db_queue = ScrubDBQueue()
return (_file_queue, _db_queue)
**** CubicPower OpenStack Study ****
class Daemon(object):
**** CubicPower OpenStack Study ****
def __init__(self, wakeup_time=300, threads=1000):
LOG.info(_("Starting Daemon: wakeup_time=%(wakeup_time)s "
"threads=%(threads)s"),
{'wakeup_time': wakeup_time, 'threads': threads})
self.wakeup_time = wakeup_time
self.event = eventlet.event.Event()
self.pool = eventlet.greenpool.GreenPool(threads)
**** CubicPower OpenStack Study ****
def start(self, application):
self._run(application)
**** CubicPower OpenStack Study ****
def wait(self):
try:
self.event.wait()
except KeyboardInterrupt:
msg = _("Daemon Shutdown on KeyboardInterrupt")
LOG.info(msg)
**** CubicPower OpenStack Study ****
def _run(self, application):
LOG.debug(_("Running application"))
self.pool.spawn_n(application.run, self.pool, self.event)
eventlet.spawn_after(self.wakeup_time, self._run, application)
LOG.debug(_("Next run scheduled in %s seconds") % self.wakeup_time)
**** CubicPower OpenStack Study ****
class Scrubber(object):
**** CubicPower OpenStack Study ****
def __init__(self, store_api):
LOG.info(_("Initializing scrubber with configuration: %s") %
unicode({'scrubber_datadir': CONF.scrubber_datadir,
'cleanup': CONF.cleanup_scrubber,
'cleanup_time': CONF.cleanup_scrubber_time,
'registry_host': CONF.registry_host,
'registry_port': CONF.registry_port}))
utils.safe_mkdirs(CONF.scrubber_datadir)
self.store_api = store_api
registry.configure_registry_client()
registry.configure_registry_admin_creds()
self.registry = registry.get_registry_client(context.RequestContext())
(self.file_queue, self.db_queue) = get_scrub_queues()
**** CubicPower OpenStack Study ****
def _get_delete_jobs(self, queue, pop):
try:
if pop:
image_id_uri_list = queue.pop_all_locations()
else:
image_id_uri_list = queue.get_all_locations()
except Exception:
LOG.error(_("Can not %s scrub jobs from queue.") %
'pop' if pop else 'get')
return None
delete_jobs = {}
for image_id, image_uri in image_id_uri_list:
if image_id not in delete_jobs:
delete_jobs[image_id] = []
delete_jobs[image_id].append((image_id, image_uri))
return delete_jobs
**** CubicPower OpenStack Study ****
def run(self, pool, event=None):
delete_jobs = self._get_delete_jobs(self.file_queue, True)
if delete_jobs:
for image_id, jobs in delete_jobs.iteritems():
self._scrub_image(pool, image_id, jobs)
if CONF.cleanup_scrubber:
self._cleanup(pool)
**** CubicPower OpenStack Study ****
def _scrub_image(self, pool, image_id, delete_jobs):
if len(delete_jobs) == 0:
return
LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") %
{'id': image_id, 'count': len(delete_jobs)})
# NOTE(bourke): The starmap must be iterated to do work
list(pool.starmap(self._delete_image_from_backend, delete_jobs))
image = self.registry.get_image(image_id)
if (image['status'] == 'pending_delete' and
not self.file_queue.has_image(image_id)):
self.registry.update_image(image_id, {'status': 'deleted'})
**** CubicPower OpenStack Study ****
def _delete_image_from_backend(self, image_id, uri):
if CONF.metadata_encryption_key is not None:
uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
try:
LOG.debug(_("Deleting URI from image %(image_id)s.") %
{'image_id': image_id})
# Here we create a request context with credentials to support
# delayed delete when using multi-tenant backend storage
admin_tenant = CONF.admin_tenant_name
auth_token = self.registry.auth_tok
admin_context = context.RequestContext(user=CONF.admin_user,
tenant=admin_tenant,
auth_tok=auth_token)
self.store_api.delete_from_backend(admin_context, uri)
except Exception:
msg = _("Failed to delete URI from image %(image_id)s")
LOG.error(msg % {'image_id': image_id})
**** CubicPower OpenStack Study ****
def _read_cleanup_file(self, file_path):
"""Reading cleanup to get latest cleanup timestamp.
:param file_path: Cleanup status file full path
:retval latest cleanup timestamp
"""
try:
if not os.path.exists(file_path):
msg = _("%s file is not exists.") % unicode(file_path)
raise Exception(msg)
atime = int(os.path.getatime(file_path))
mtime = int(os.path.getmtime(file_path))
if atime != mtime:
msg = _("%s file contains conflicting cleanup "
"timestamp.") % unicode(file_path)
raise Exception(msg)
return atime
except Exception as e:
LOG.error(e)
return None
**** CubicPower OpenStack Study ****
def _update_cleanup_file(self, file_path, cleanup_time):
"""Update latest cleanup timestamp to cleanup file.
:param file_path: Cleanup status file full path
:param cleanup_time: The Latest cleanup timestamp
"""
try:
open(file_path, 'w').close()
os.chmod(file_path, 0o600)
os.utime(file_path, (cleanup_time, cleanup_time))
except Exception:
LOG.error(_("%s file can not be created.") % unicode(file_path))
**** CubicPower OpenStack Study ****
def _cleanup(self, pool):
now = time.time()
cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup")
if not os.path.exists(cleanup_file):
self._update_cleanup_file(cleanup_file, now)
return
last_cleanup_time = self._read_cleanup_file(cleanup_file)
cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time
if cleanup_time > now:
return
LOG.info(_("Getting images deleted before "
"%s") % CONF.cleanup_scrubber_time)
self._update_cleanup_file(cleanup_file, now)
delete_jobs = self._get_delete_jobs(self.db_queue, False)
if not delete_jobs:
return
for image_id, jobs in delete_jobs.iteritems():
with lockutils.lock("scrubber-%s" % image_id,
lock_file_prefix='glance-', external=True):
if not self.file_queue.has_image(image_id):
# NOTE(zhiyan): scrubber should not cleanup this image
# since a queue file be created for this 'pending_delete'
# image concurrently before the code get lock and
# reach here. The checking only be worth if glance-api and
# glance-scrubber service be deployed on a same host.
self._scrub_image(pool, image_id, jobs)