OpenStack Study:
OpenStack Index
class GreenthreadSafeIterator(object):
def __init__(self, unsafe_iterable):
def __iter__(self):
def next(self):
class NullLogger(object):
def write(self, *args):
class LoggerFileObject(object):
def __init__(self, logger):
def write(self, value):
def writelines(self, values):
def close(self):
def flush(self):
def __iter__(self):
def next(self):
def read(self, size=-1):
def readline(self, size=-1):
def tell(self):
def xreadlines(self):
class StatsdClient(object):
def __init__(self, host, port, base_prefix='', tail_prefix='', default_sample_rate=1, sample_rate_factor=1):
def set_prefix(self, new_prefix):
def _send(self, m_name, m_value, m_type, sample_rate):
def _open_socket(self):
def update_stats(self, m_name, m_value, sample_rate=None):
def increment(self, metric, sample_rate=None):
def decrement(self, metric, sample_rate=None):
def timing(self, metric, timing_ms, sample_rate=None):
def timing_since(self, metric, orig_time, sample_rate=None):
def transfer_rate(self, metric, elapsed_time, byte_xfer, sample_rate=None):
def timing_stats(**dec_kwargs):
def decorating_func(func):
def _timing_stats(ctrl, *args, **kwargs):
class LoggingHandlerWeakRef(weakref.ref):
def close(self):
def flush(self):
class LogAdapter(logging.LoggerAdapter, object):
def __init__(self, logger, server):
def txn_id(self):
def txn_id(self, value):
def client_ip(self):
def client_ip(self, value):
def thread_locals(self):
def thread_locals(self, value):
def getEffectiveLevel(self):
def process(self, msg, kwargs):
def notice(self, msg, *args, **kwargs):
def _exception(self, msg, *args, **kwargs):
def exception(self, msg, *args, **kwargs):
def set_statsd_prefix(self, prefix):
def statsd_delegate(statsd_func_name):
def wrapped(self, *a, **kw):
class SwiftLogFormatter(logging.Formatter):
def format(self, record):
def get_logger(conf, name=None, log_to_console=False, log_route=None, fmt="%(server)s:
def get_hub():
def drop_privileges(user):
def capture_stdio(logger, **kwargs):
def parse_options(parser=None, once=False, test_args=None):
def whataremyips():
def storage_directory(datadir, partition, name_hash):
def hash_path(account, container=None, object=None, raw_digest=False):
def lock_path(directory, timeout=10, timeout_class=LockTimeout):
def lock_file(filename, timeout=10, append=False, unlink=True):
def lock_parent_directory(filename, timeout=10):
def get_time_units(time_amount):
def compute_eta(start_time, current_value, final_value):
def unlink_older_than(path, mtime):
def item_from_env(env, item_name):
def cache_from_env(env):
def read_conf_dir(parser, conf_dir):
def readconf(conf_path, section_name=None, log_name=None, defaults=None, raw=False):
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
def search_tree(root, glob_match, ext='', dir_ext=None):
def write_file(path, contents):
def remove_file(path):
def audit_location_generator(devices, datadir, suffix='', mount_check=True, logger=None):
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
class ContextPool(GreenPool):
def __enter__(self):
def __exit__(self, type, value, traceback):
class GreenAsyncPileWaitallTimeout(Timeout):
class GreenAsyncPile(object):
def __init__(self, size):
def _run_func(self, func, args, kwargs):
def spawn(self, func, *args, **kwargs):
def waitall(self, timeout):
def __iter__(self):
def next(self):
class ModifiedParseResult(ParseResult):
def hostname(self):
def port(self):
def urlparse(url):
def validate_sync_to(value, allowed_sync_hosts, realms_conf):
def affinity_key_function(affinity_str):
def keyfn(ring_node):
def affinity_locality_predicate(write_affinity_str):
def is_local(ring_node):
def get_remote_client(req):
def human_readable(value):
def put_recon_cache_entry(cache_entry, key, item):
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
def listdir(path):
def streq_const_time(s1, s2):
def replication(func):
def public(func):
def wrapped(*a, **kw):
def quorum_size(n):
def rsync_ip(ip):
def get_valid_utf8_str(str_or_unicode):
def list_from_csv(comma_separated_str):
def csv_append(csv_string, item):
class CloseableChain(object):
def __init__(self, *iterables):
def __iter__(self):
def close(self):
def reiterate(iterable):
class InputProxy(object):
def __init__(self, wsgi_input):
def read(self, *args, **kwargs):
def readline(self, *args, **kwargs):
def tpool_reraise(func, *args, **kwargs):
def inner():
class ThreadPool(object):
def __init__(self, nthreads=2):
def _worker(self, work_queue, result_queue):
def _consume_results(self, queue):
def run_in_thread(self, func, *args, **kwargs):
def _run_in_eventlet_tpool(self, func, *args, **kwargs):
def inner():
def force_run_in_thread(self, func, *args, **kwargs):
def ismount(path):
def ismount_raw(path):
def parse_content_type(content_type):
def override_bytes_from_content_type(listing_dict, logger=None):
def quote(value, safe='/'):
\OpenStack\swift-2014.1\swift\common\wsgi.py
class NamedConfigLoader(loadwsgi.ConfigLoader):
def get_context(self, object_type, name=None, global_conf=None):
class ConfigDirLoader(NamedConfigLoader):
def __init__(self, conf_dir):
def _loadconfigdir(object_type, uri, path, name, relative_to, global_conf):
def wrap_conf_type(f):
def wrapper(conf_path, *args, **kwargs):
def monkey_patch_mimetools():
def parsetype(self):
def get_socket(conf, default_port=8080):
class RestrictedGreenPool(GreenPool):
def __init__(self, size=1024):
def spawn_n(self, *args, **kwargs):
class PipelineWrapper(object):
def __init__(self, context):
def __contains__(self, entry_point_name):
def startswith(self, entry_point_name):
def _format_for_display(self, ctx):
def __str__(self):
def create_filter(self, entry_point_name):
def index(self, entry_point_name):
def insert_filter(self, ctx, index=0):
def loadcontext(object_type, uri, name=None, relative_to=None, global_conf=None):
def loadapp(conf_file, global_conf=None, allow_modify_pipeline=True):
def run_server(conf, logger, sock, global_conf=None):
def run_wsgi(conf_path, app_section, *args, **kwargs):
def kill_children(*args):
def hup(*args):
class ConfigFileError(Exception):
def _initrp(conf_path, app_section, *args, **kwargs):
def init_request_processor(conf_path, app_section, *args, **kwargs):
class WSGIContext(object):
def __init__(self, wsgi_app):
def _start_response(self, status, headers, exc_info=None):
def _app_call(self, env):
def _get_status_int(self):
def _response_header_value(self, key):
def make_env(env, method=None, path=None, agent='Swift', query_string=None, swift_source=None):
def make_subrequest(env, method=None, path=None, body=None, headers=None, agent='Swift', swift_source=None, make_env=make_env):
def make_pre_authed_env(env, method=None, path=None, agent='Swift', query_string=None, swift_source=None):
def make_pre_authed_request(env, method=None, path=None, body=None, headers=None, agent='Swift', swift_source=None):
\OpenStack\swift-2014.1\swift\common\__init__.py
\OpenStack\swift-2014.1\swift\container\auditor.py
class ContainerAuditor(Daemon):
def __init__(self, conf):
def _one_audit_pass(self, reported):
def run_forever(self, *args, **kwargs):
def run_once(self, *args, **kwargs):
def container_audit(self, path):
\OpenStack\swift-2014.1\swift\container\backend.py
class ContainerBroker(DatabaseBroker):
def _initialize(self, conn, put_timestamp):
def create_object_table(self, conn):
def create_container_stat_table(self, conn, put_timestamp=None):
def get_db_version(self, conn):
def _newid(self, conn):
def _delete_db(self, conn, timestamp):
def _commit_puts_load(self, item_list, entry):
def empty(self):
def delete_object(self, name, timestamp):
def put_object(self, name, timestamp, size, content_type, etag, deleted=0):
def is_deleted(self, timestamp=None):
def get_info(self):
def set_x_container_sync_points(self, sync_point1, sync_point2):
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
def reported(self, put_timestamp, delete_timestamp, object_count, bytes_used):
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, path=None):
def merge_items(self, item_list, source=None):
\OpenStack\swift-2014.1\swift\container\replicator.py
class ContainerReplicator(db_replicator.Replicator):
def report_up_to_date(self, full_info):
\OpenStack\swift-2014.1\swift\container\server.py
class ContainerController(object):
def __init__(self, conf, logger=None):
def _get_container_broker(self, drive, part, account, container, **kwargs):
def account_update(self, req, account, container, broker):
def DELETE(self, req):
def _update_or_create(self, req, broker, timestamp):
def PUT(self, req):
def HEAD(self, req):
def update_data_record(self, record):
def GET(self, req):
def REPLICATE(self, req):
def POST(self, req):
def __call__(self, env, start_response):
def app_factory(global_conf, **local_conf):
\OpenStack\swift-2014.1\swift\container\sync.py
class ContainerSync(Daemon):
def __init__(self, conf, container_ring=None, object_ring=None):
def run_forever(self, *args, **kwargs):
def run_once(self, *args, **kwargs):
def report(self):
def container_sync(self, path):
def container_sync_row(self, row, sync_to, user_key, broker, info, realm, realm_key):
def select_http_proxy(self):
\OpenStack\swift-2014.1\swift\container\updater.py
class ContainerUpdater(Daemon):
def __init__(self, conf):
def get_account_ring(self):
def get_paths(self):
def _load_suppressions(self, filename):
def run_forever(self, *args, **kwargs):
def run_once(self, *args, **kwargs):
def container_sweep(self, path):
def process_container(self, dbfile):
def container_report(self, node, part, container, put_timestamp, delete_timestamp, count, bytes):
\OpenStack\swift-2014.1\swift\container\__init__.py
\OpenStack\swift-2014.1\swift\obj\auditor.py
class AuditorWorker(object):
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
def create_recon_nested_dict(self, top_level_key, device_list, item):
def audit_all_objects(self, mode='once', device_dirs=None):
def record_stats(self, obj_size):
def failsafe_object_audit(self, location):
def object_audit(self, location):
def raise_dfq(msg):
class ObjectAuditor(Daemon):
def __init__(self, conf, **options):
def _sleep(self):
def clear_recon_cache(self, auditor_type):
def run_audit(self, **kwargs):
def fork_child(self, zero_byte_fps=False, **kwargs):
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
def run_forever(self, *args, **kwargs):
def run_once(self, *args, **kwargs):
\OpenStack\swift-2014.1\swift\obj\diskfile.py
def read_metadata(fd):
def write_metadata(fd, metadata):
def quarantine_renamer(device_path, corrupted_file_path):
def get_ondisk_files(files, datadir):
def hash_cleanup_listdir(hsh_path, reclaim_age=ONE_WEEK):
def hash_suffix(path, reclaim_age):
def invalidate_hash(suffix_dir):
def get_hashes(partition_dir, recalculate=None, do_listdir=False, reclaim_age=ONE_WEEK):
class AuditLocation(object):
def __init__(self, path, device, partition):
def __str__(self):
def object_audit_location_generator(devices, mount_check=True, logger=None, device_dirs=None):
class DiskFileManager(object):
def __init__(self, conf, logger):
def construct_dev_path(self, device):
def get_dev_path(self, device, mount_check=None):
def replication_lock(self, device):
def pickle_async_update(self, device, account, container, obj, data, timestamp):
def get_diskfile(self, device, partition, account, container, obj, **kwargs):
def object_audit_location_generator(self, device_dirs=None):
def get_diskfile_from_audit_location(self, audit_location):
def get_diskfile_from_hash(self, device, partition, object_hash, **kwargs):
def get_hashes(self, device, partition, suffix):
def _listdir(self, path):
def yield_suffixes(self, device, partition):
def yield_hashes(self, device, partition, suffixes=None):
class DiskFileWriter(object):
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, threadpool):
def write(self, chunk):
def _write_entire_chunk(chunk):
def _finalize_put(self, metadata, target_path):
def put(self, metadata):
class DiskFileReader(object):
def __init__(self, fp, data_file, obj_size, etag, threadpool, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, keep_cache=False):
def __iter__(self):
def app_iter_range(self, start, stop):
def app_iter_ranges(self, ranges, content_type, boundary, size):
def _drop_cache(self, fd, offset, length):
def _quarantine(self, msg):
def _handle_close_quarantine(self):
def close(self):
class DiskFile(object):
def __init__(self, mgr, device_path, threadpool, partition, account=None, container=None, obj=None, _datadir=None):
def account(self):
def container(self):
def obj(self):
def content_length(self):
def timestamp(self):
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
def open(self):
def __enter__(self):
def __exit__(self, t, v, tb):
def _quarantine(self, data_file, msg):
def _get_ondisk_file(self):
def _construct_exception_from_ts_file(self, ts_file):
def _verify_name_matches_hash(self, data_file):
def _verify_data_file(self, data_file, fp):
def _failsafe_read_metadata(self, source, quarantine_filename=None):
def _construct_from_data_file(self, data_file, meta_file):
def get_metadata(self):
def read_metadata(self):
def reader(self, keep_cache=False, _quarantine_hook=lambda m:
def create(self, size=None):
def write_metadata(self, metadata):
def delete(self, timestamp):
\OpenStack\swift-2014.1\swift\obj\expirer.py
class ObjectExpirer(Daemon):
def __init__(self, conf):
def report(self, final=False):
def run_once(self, *args, **kwargs):
def run_forever(self, *args, **kwargs):
def get_process_values(self, kwargs):
def delete_object(self, actual_obj, timestamp, container, obj):
def delete_actual_object(self, actual_obj, timestamp):
\OpenStack\swift-2014.1\swift\obj\mem_diskfile.py
class InMemoryFileSystem(object):
def __init__(self):
def get_object(self, name):
def put_object(self, name, data, metadata):
def del_object(self, name):
def get_diskfile(self, account, container, obj, **kwargs):
class DiskFileWriter(object):
def __init__(self, fs, name, fp):
def write(self, chunk):
def put(self, metadata):
class DiskFileReader(object):
def __init__(self, name, fp, obj_size, etag):
def __iter__(self):
def app_iter_range(self, start, stop):
def app_iter_ranges(self, ranges, content_type, boundary, size):
def _quarantine(self, msg):
def _handle_close_quarantine(self):
def close(self):
class DiskFile(object):
def __init__(self, fs, account, container, obj):
def open(self):
def __enter__(self):
def __exit__(self, t, v, tb):
def _verify_data_file(self, fp):
def get_metadata(self):
def read_metadata(self):
def reader(self, keep_cache=False):
def create(self, size=None):
def write_metadata(self, metadata):
def delete(self, timestamp):
\OpenStack\swift-2014.1\swift\obj\mem_server.py
class ObjectController(server.ObjectController):
def setup(self, conf):
def get_diskfile(self, device, partition, account, container, obj, **kwargs):
def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice):
def REPLICATE(self, request):
def app_factory(global_conf, **local_conf):
\OpenStack\swift-2014.1\swift\obj\replicator.py
class ObjectReplicator(Daemon):
def __init__(self, conf):
def sync(self, node, job, suffixes):
def _rsync(self, args):
def rsync(self, node, job, suffixes):
def ssync(self, node, job, suffixes):
def check_ring(self):
def update_deleted(self, job):
def tpool_get_suffixes(path):
def update(self, job):
def stats_line(self):
def kill_coros(self):
def heartbeat(self):
def detect_lockups(self):
def collect_jobs(self):
def replicate(self, override_devices=None, override_partitions=None):
def run_once(self, *args, **kwargs):
def run_forever(self, *args, **kwargs):
\OpenStack\swift-2014.1\swift\obj\server.py
class ObjectController(object):
def __init__(self, conf, logger=None):
def setup(self, conf):
def get_diskfile(self, device, partition, account, container, obj, **kwargs):
def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice):
def container_update(self, op, account, container, obj, request, headers_out, objdevice):
def delete_at_update(self, op, delete_at, account, container, obj, request, objdevice):
def POST(self, request):
def PUT(self, request):
def GET(self, request):
def HEAD(self, request):
def DELETE(self, request):
def REPLICATE(self, request):
def REPLICATION(self, request):
def __call__(self, env, start_response):
def global_conf_callback(preloaded_app_conf, global_conf):
def app_factory(global_conf, **local_conf):
\OpenStack\swift-2014.1\swift\obj\ssync_receiver.py
class Receiver(object):
def __init__(self, app, request):
def __call__(self):
def _ensure_flush(self):
def initialize_request(self):
def missing_check(self):
def updates(self):
\OpenStack\swift-2014.1\swift\obj\ssync_sender.py
class Sender(object):
def __init__(self, daemon, node, job, suffixes):
def __call__(self):
def connect(self):
def readline(self):
def missing_check(self):
def updates(self):
def send_delete(self, url_path, timestamp):
def send_put(self, url_path, df):
def disconnect(self):
\OpenStack\swift-2014.1\swift\obj\updater.py
class ObjectUpdater(Daemon):
def __init__(self, conf):
def get_container_ring(self):
def run_forever(self, *args, **kwargs):
def run_once(self, *args, **kwargs):
def object_sweep(self, device):
def process_object_update(self, update_path, device):
def object_update(self, node, part, op, obj, headers):
\OpenStack\swift-2014.1\swift\obj\__init__.py
\OpenStack\swift-2014.1\swift\proxy\controllers\account.py
class AccountController(Controller):
def __init__(self, app, account_name, **kwargs):
def add_acls_from_sys_metadata(self, resp):
def GETorHEAD(self, req):
def PUT(self, req):
def POST(self, req):
def DELETE(self, req):
\OpenStack\swift-2014.1\swift\proxy\controllers\base.py
def update_headers(response, headers):
def source_key(resp):
def delay_denial(func):
def wrapped(*a, **kw):
def get_account_memcache_key(account):
def get_container_memcache_key(account, container):
def _prep_headers_to_info(headers, server_type):
def headers_to_account_info(headers, status_int=HTTP_OK):
def headers_to_container_info(headers, status_int=HTTP_OK):
def headers_to_object_info(headers, status_int=HTTP_OK):
def cors_validation(func):
def wrapped(*a, **kw):
def get_object_info(env, app, path=None, swift_source=None):
def get_container_info(env, app, swift_source=None):
def get_account_info(env, app, swift_source=None):
def _get_cache_key(account, container):
def get_object_env_key(account, container, obj):
def _set_info_cache(app, env, account, container, resp):
def _set_object_info_cache(app, env, account, container, obj, resp):
def clear_info_cache(app, env, account, container=None):
def _get_info_cache(app, env, account, container=None):
def _prepare_pre_auth_info_request(env, path, swift_source):
def get_info(app, env, account, container=None, ret_not_found=False, swift_source=None):
def _get_object_info(app, env, account, container, obj, swift_source=None):
def close_swift_conn(src):
class GetOrHeadHandler(object):
def __init__(self, app, req, server_type, ring, partition, path, backend_headers):
def fast_forward(self, num_bytes):
def is_good_source(self, src):
def _make_app_iter(self, req, node, source):
def _get_source_and_node(self):
def get_working_response(self, req):
class Controller(object):
def __init__(self, app):
def allowed_methods(self):
def _x_remove_headers(self):
def transfer_headers(self, src_headers, dst_headers):
def generate_request_headers(self, orig_req=None, additional=None, transfer=False):
def account_info(self, account, req=None):
def container_info(self, account, container, req=None):
def _make_request(self, nodes, part, method, path, headers, query, logger_thread_locals):
def make_requests(self, req, ring, part, method, path, headers, query_string=''):
def have_quorum(self, statuses, node_count):
def best_response(self, req, statuses, reasons, bodies, server_type, etag=None, headers=None):
def GET(self, req):
def HEAD(self, req):
def autocreate_account(self, env, account):
def GETorHEAD_base(self, req, server_type, ring, partition, path):
def is_origin_allowed(self, cors_info, origin):
def OPTIONS(self, req):
\OpenStack\swift-2014.1\swift\proxy\controllers\container.py
class ContainerController(Controller):
def __init__(self, app, account_name, container_name, **kwargs):
def _x_remove_headers(self):
def clean_acls(self, req):
def GETorHEAD(self, req):
def GET(self, req):
def HEAD(self, req):
def PUT(self, req):
def POST(self, req):
def DELETE(self, req):
def _backend_requests(self, req, n_outgoing, account_partition, accounts):
\OpenStack\swift-2014.1\swift\proxy\controllers\info.py
class InfoController(Controller):
def __init__(self, app, version, expose_info, disallowed_sections, admin_key):
def GET(self, req):
def HEAD(self, req):
def OPTIONS(self, req):
def GETorHEAD(self, req):
\OpenStack\swift-2014.1\swift\proxy\controllers\obj.py
def copy_headers_into(from_r, to_r):
def check_content_type(req):
class ObjectController(Controller):
def __init__(self, app, account_name, container_name, object_name, **kwargs):
def _listing_iter(self, lcontainer, lprefix, env):
def _listing_pages_iter(self, lcontainer, lprefix, env):
def _remaining_items(self, listing_iter):
def iter_nodes_local_first(self, ring, partition):
def GETorHEAD(self, req):
def GET(self, req):
def HEAD(self, req):
def POST(self, req):
def _backend_requests(self, req, n_outgoing, container_partition, containers, delete_at_container=None, delete_at_partition=None, delete_at_nodes=None):
def _send_file(self, conn, path):
def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals):
def _get_put_responses(self, req, conns, nodes):
def get_conn_response(conn):
def PUT(self, req):
def DELETE(self, req):
def COPY(self, req):
\OpenStack\swift-2014.1\swift\proxy\controllers\__init__.py
\OpenStack\swift-2014.1\swift\proxy\server.py
class Application(object):
def __init__(self, conf, memcache=None, logger=None, account_ring=None, container_ring=None, object_ring=None):
def check_config(self):
def get_controller(self, path):
def __call__(self, env, start_response):
def update_request(self, req):
def handle_request(self, req):
def sort_nodes(self, nodes):
def set_node_timing(self, node, timing):
def error_limited(self, node):
def error_limit(self, node, msg):
def error_occurred(self, node, msg):
def iter_nodes(self, ring, partition, node_iter=None):
def exception_occurred(self, node, typ, additional_info):
def modify_wsgi_pipeline(self, pipe):
def app_factory(global_conf, **local_conf):
\OpenStack\swift-2014.1\swift\proxy\__init__.py
\OpenStack\swift-2014.1\swift\__init__.py
def gettext_(msg):
|