OpenStack Study: glance
OpenStack IndexPreviousNext
def __init__(self):
def _handle_signal(self, signo, frame):
def _pipe_watcher(self):
def _child_process(self, service):
def _sigterm(*args):
def _start_child(self, wrap):
def launch_service(self, service, workers=1):
def _wait_child(self):
def wait(self):
class Service(object):
def __init__(self, threads=1000):
def start(self):
def stop(self):
def wait(self):
def launch(service, workers=None):
\OpenStack\glance-2014.1\glance\openstack\common\strutils.py
def int_from_bool_as_string(subject):
def bool_from_string(subject, strict=False):
def safe_decode(text, incoming=None, errors='strict'):
def safe_encode(text, incoming=None, encoding='utf-8', errors='strict'):
def to_bytes(text, default=0):
def to_slug(value, incoming=None, errors="strict"):
\OpenStack\glance-2014.1\glance\openstack\common\test.py
class BaseTestCase(testtools.TestCase):
def setUp(self):
def _set_timeout(self):
def _fake_output(self):
\OpenStack\glance-2014.1\glance\openstack\common\threadgroup.py
def _thread_done(gt, *args, **kwargs):
class Thread(object):
def __init__(self, thread, group):
def stop(self):
def wait(self):
class ThreadGroup(object):
def __init__(self, thread_pool_size=10):
def add_timer(self, interval, callback, initial_delay=None, *args, **kwargs):
def add_thread(self, callback, *args, **kwargs):
def thread_done(self, thread):
def stop(self):
def wait(self):
\OpenStack\glance-2014.1\glance\openstack\common\timeutils.py
def isotime(at=None, subsecond=False):
def parse_isotime(timestr):
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
def normalize_time(timestamp):
def is_older_than(before, seconds):
def is_newer_than(after, seconds):
def utcnow_ts():
def utcnow():
def iso8601_from_timestamp(timestamp):
def set_time_override(override_time=None):
def advance_time_delta(timedelta):
def advance_time_seconds(seconds):
def clear_time_override():
def marshall_now(now=None):
def unmarshall_time(tyme):
def delta_seconds(before, after):
def total_seconds(delta):
def is_soon(dt, window):
\OpenStack\glance-2014.1\glance\openstack\common\units.py
\OpenStack\glance-2014.1\glance\openstack\common\__init__.py
\OpenStack\glance-2014.1\glance\openstack\__init__.py
\OpenStack\glance-2014.1\glance\quota\__init__.py
def _enforce_image_tag_quota(tags):
def _calc_required_size(context, image, locations):
def _enforce_image_location_quota(image, locations, is_setter=False):
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, db_api):
def _enforce_image_property_quota(self, image):
def save(self, image):
def add(self, image):
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, factory, context, db_api):
def new_image(self, **kwargs):
class QuotaImageTagsProxy(object):
def __init__(self, orig_set):
def add(self, item):
def __cast__(self, *args, **kwargs):
def __contains__(self, *args, **kwargs):
def __eq__(self, other):
def __iter__(self, *args, **kwargs):
def __len__(self, *args, **kwargs):
def __getattr__(self, name):
class ImageMemberFactoryProxy(glance.domain.proxy.ImageMembershipFactory):
def __init__(self, member_factory, context, db_api):
def _enforce_image_member_quota(self, image):
def new_image_member(self, image, member_id):
class QuotaImageLocationsProxy(object):
def __init__(self, image, context, db_api):
def __cast__(self, *args, **kwargs):
def __contains__(self, *args, **kwargs):
def __delitem__(self, *args, **kwargs):
def __delslice__(self, *args, **kwargs):
def __eq__(self, other):
def __getitem__(self, *args, **kwargs):
def __iadd__(self, other):
def __iter__(self, *args, **kwargs):
def __len__(self, *args, **kwargs):
def __setitem__(self, key, value):
def count(self, *args, **kwargs):
def index(self, *args, **kwargs):
def pop(self, *args, **kwargs):
def remove(self, *args, **kwargs):
def reverse(self, *args, **kwargs):
def _check_user_storage_quota(self, locations):
def __copy__(self):
def __deepcopy__(self, memo):
def append(self, object):
def insert(self, index, object):
def extend(self, iter):
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, db_api):
def set_data(self, data, size=None):
def tags(self):
def tags(self, value):
def locations(self):
def locations(self, value):
\OpenStack\glance-2014.1\glance\registry\api\v1\images.py
def _normalize_image_location_for_db(image_data):
class Controller(object):
def __init__(self):
def _get_images(self, context, filters, **params):
def index(self, req):
def detail(self, req):
def _get_query_params(self, req):
def _get_filters(self, req):
def _get_limit(self, req):
def _get_marker(self, req):
def _get_sort_key(self, req):
def _get_sort_dir(self, req):
def _get_bool(self, value):
def _get_is_public(self, req):
def _parse_deleted_filter(self, req):
def show(self, req, id):
def delete(self, req, id):
def create(self, req, body):
def update(self, req, id, body):
def _limit_locations(image):
def make_image_dict(image):
def _fetch_attrs(d, attrs):
def create_resource():
\OpenStack\glance-2014.1\glance\registry\api\v1\members.py
class Controller(object):
def _check_can_access_image_members(self, context):
def __init__(self):
def is_image_sharable(self, context, image):
def index(self, req, image_id):
def update_all(self, req, image_id, body):
def update(self, req, image_id, id, body=None):
def delete(self, req, image_id, id):
def index_shared_images(self, req, id):
def make_member_list(members, **attr_map):
def _fetch_memb(memb, attr_map):
def create_resource():
\OpenStack\glance-2014.1\glance\registry\api\v1\__init__.py
def init(mapper):
class API(wsgi.Router):
def __init__(self, mapper):
\OpenStack\glance-2014.1\glance\registry\api\v2\rpc.py
class Controller(rpc.Controller):
def __init__(self, raise_exc=False):
def create_resource():
\OpenStack\glance-2014.1\glance\registry\api\v2\__init__.py
def init(mapper):
class API(wsgi.Router):
def __init__(self, mapper):
\OpenStack\glance-2014.1\glance\registry\api\__init__.py
class API(wsgi.Router):
def __init__(self, mapper):
\OpenStack\glance-2014.1\glance\registry\client\v1\api.py
def configure_registry_client():
def configure_registry_admin_creds():
def get_registry_client(cxt):
def get_images_list(context, **kwargs):
def get_images_detail(context, **kwargs):
def get_image_metadata(context, image_id):
def add_image_metadata(context, image_meta):
def update_image_metadata(context, image_id, image_meta, purge_props=False, from_state=None):
def delete_image_metadata(context, image_id):
def get_image_members(context, image_id):
def get_member_images(context, member_id):
def replace_members(context, image_id, member_data):
def add_member(context, image_id, member_id, can_share=None):
def delete_member(context, image_id, member_id):
\OpenStack\glance-2014.1\glance\registry\client\v1\client.py
class RegistryClient(BaseClient):
def __init__(self, host=None, port=None, metadata_encryption_key=None, identity_headers=None, **kwargs):
def decrypt_metadata(self, image_metadata):
def encrypt_metadata(self, image_metadata):
def get_images(self, **kwargs):
def do_request(self, method, action, **kwargs):
def get_images_detailed(self, **kwargs):
def get_image(self, image_id):
def add_image(self, image_metadata):
def update_image(self, image_id, image_metadata, purge_props=False, from_state=None):
def delete_image(self, image_id):
def get_image_members(self, image_id):
def get_member_images(self, member_id):
def replace_members(self, image_id, member_data):
def add_member(self, image_id, member_id, can_share=None):
def delete_member(self, image_id, member_id):
\OpenStack\glance-2014.1\glance\registry\client\v1\__init__.py
\OpenStack\glance-2014.1\glance\registry\client\v2\api.py
def configure_registry_client():
def configure_registry_admin_creds():
def get_registry_client(cxt):
\OpenStack\glance-2014.1\glance\registry\client\v2\client.py
class RegistryClient(rpc.RPCClient):
\OpenStack\glance-2014.1\glance\registry\client\v2\__init__.py
\OpenStack\glance-2014.1\glance\registry\client\__init__.py
\OpenStack\glance-2014.1\glance\registry\__init__.py
\OpenStack\glance-2014.1\glance\schema.py
class Schema(object):
def __init__(self, name, properties=None, links=None):
def validate(self, obj):
def filter(self, obj):
def _filter_func(properties, key):
def merge_properties(self, properties):
def raw(self):
def minimal(self):
class PermissiveSchema(Schema):
def _filter_func(properties, key):
def raw(self):
def minimal(self):
class CollectionSchema(object):
def __init__(self, name, item_schema):
def raw(self):
def minimal(self):
\OpenStack\glance-2014.1\glance\scrubber.py
class ScrubQueue(object):
def __init__(self):
def add_location(self, image_id, uri, user_context=None):
def get_all_locations(self):
def pop_all_locations(self):
def has_image(self, image_id):
class ScrubFileQueue(ScrubQueue):
def __init__(self):
def _read_queue_file(self, file_path):
def _update_queue_file(self, file_path, remove_record_idxs):
def add_location(self, image_id, uri, user_context=None):
def _walk_all_locations(self, remove=False):
def get_all_locations(self):
def pop_all_locations(self):
def has_image(self, image_id):
class ScrubDBQueue(ScrubQueue):
def __init__(self):
def add_location(self, image_id, uri, user_context=None):
def _walk_all_locations(self, remove=False):
def get_all_locations(self):
def pop_all_locations(self):
def has_image(self, image_id):
def get_scrub_queues():
class Daemon(object):
def __init__(self, wakeup_time=300, threads=1000):
def start(self, application):
def wait(self):
def _run(self, application):
class Scrubber(object):
def __init__(self, store_api):
def _get_delete_jobs(self, queue, pop):
def run(self, pool, event=None):
def _scrub_image(self, pool, image_id, delete_jobs):
def _delete_image_from_backend(self, image_id, uri):
def _read_cleanup_file(self, file_path):
def _update_cleanup_file(self, file_path, cleanup_time):
def _cleanup(self, pool):
\OpenStack\glance-2014.1\glance\store\base.py
def _exception_to_unicode(exc):
class Store(object):
def __init__(self, context=None, location=None):
def configure(self):
def get_schemes(self):
def get_store_location_class(self):
def configure_add(self):
def get(self, location):
def get_size(self, location):
def add_disabled(self, *args, **kwargs):
def add(self, image_id, image_file, image_size):
def delete(self, location):
def set_acls(self, location, public=False, read_tenants=[], write_tenants=[]):
\OpenStack\glance-2014.1\glance\store\cinder.py
def get_cinderclient(context):
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def get_uri(self):
def parse_uri(self, uri):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure_add(self):
def get_size(self, location):
\OpenStack\glance-2014.1\glance\store\filesystem.py
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def get_uri(self):
def parse_uri(self, uri):
class ChunkedFile(object):
def __init__(self, filepath):
def __iter__(self):
def close(self):
class Store(glance.store.base.Store):
def get_schemes(self):
def _check_write_permission(self, datadir):
def _create_image_directories(self, directory_paths):
def configure_add(self):
def _check_directory_paths(self, datadir_path, directory_paths):
def _get_datadir_path_and_priority(self, datadir):
def _resolve_location(location):
def _get_metadata(self):
def get(self, location):
def get_size(self, location):
def delete(self, location):
def _get_capacity_info(self, mount_point):
def _find_best_datadir(self, image_size):
def add(self, image_id, image_file, image_size):
def _delete_partial(filepath, id):
\OpenStack\glance-2014.1\glance\store\gridfs.py
class StoreLocation(glance.store.location.StoreLocation):
def get_uri(self):
def parse_uri(self, uri):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure_add(self):
def _option_get(self, param):
def get(self, location):
def get_size(self, location):
def _get_file(self, location):
def add(self, image_id, image_file, image_size):
def delete(self, location):
\OpenStack\glance-2014.1\glance\store\http.py
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def _get_credstring(self):
def get_uri(self):
def parse_uri(self, uri):
def http_response_iterator(conn, response, size):
class Store(glance.store.base.Store):
def get(self, location):
def get_schemes(self):
def get_size(self, location):
def _query(self, location, verb, depth=0):
def _get_conn_class(self, loc):
\OpenStack\glance-2014.1\glance\store\location.py
def get_location_from_uri(uri):
def register_scheme_map(scheme_map):
class Location(object):
def __init__(self, store_name, store_location_class, uri=None, image_id=None, store_specs=None):
def get_store_uri(self):
def get_uri(self):
class StoreLocation(object):
def __init__(self, store_specs):
def process_specs(self):
def get_uri(self):
def parse_uri(self, uri):
\OpenStack\glance-2014.1\glance\store\rbd.py
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def get_uri(self):
def parse_uri(self, uri):
class ImageIterator(object):
def __init__(self, name, store):
def __iter__(self):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure_add(self):
def get(self, location):
def get_size(self, location):
def _create_image(self, fsid, ioctx, image_name, size, order):
def _delete_image(self, image_name, snapshot_name=None):
def add(self, image_id, image_file, image_size):
def delete(self, location):
\OpenStack\glance-2014.1\glance\store\s3.py
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def _get_credstring(self):
def get_uri(self):
def parse_uri(self, uri):
class ChunkedFile(object):
def __init__(self, fp):
def __iter__(self):
def getvalue(self):
def close(self):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure_add(self):
def _option_get(self, param):
def get(self, location):
def get_size(self, location):
def _retrieve_key(self, location):
def add(self, image_id, image_file, image_size):
def _sanitize(uri):
def delete(self, location):
def get_bucket(conn, bucket_id):
def get_s3_location(s3_host):
def create_bucket_if_missing(bucket, s3_conn):
def get_key(bucket, obj):
def get_calling_format(bucket_format=None):
\OpenStack\glance-2014.1\glance\store\sheepdog.py
class SheepdogImage:
def __init__(self, addr, port, name, chunk_size):
def _run_command(self, command, data, *params):
def get_size(self):
def read(self, offset, count):
def write(self, data, offset, count):
def create(self, size):
def delete(self):
def exist(self):
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def get_uri(self):
def parse_uri(self, uri):
class ImageIterator(object):
def __init__(self, image):
def __iter__(self):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure_add(self):
def get(self, location):
def get_size(self, location):
def add(self, image_id, image_file, image_size):
def delete(self, location):
\OpenStack\glance-2014.1\glance\store\swift.py
def swift_retry_iter(resp_iter, length, store, location):
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def _get_credstring(self):
def get_uri(self):
def parse_uri(self, uri):
def swift_url(self):
def Store(context=None, loc=None):
class BaseStore(glance.store.base.Store):
def get_schemes(self):
def configure(self):
def _get_object(self, location, connection=None, start=None):
def get(self, location, connection=None):
def get_size(self, location, connection=None):
def _option_get(self, param):
def _delete_stale_chunks(self, connection, container, chunk_list):
def add(self, image_id, image_file, image_size, connection=None):
def delete(self, location, connection=None):
def _create_container_if_missing(self, container, connection):
def get_connection(self):
def create_location(self):
class SingleTenantStore(BaseStore):
def configure(self):
def configure_add(self):
def create_location(self, image_id):
def get_connection(self, location):
class MultiTenantStore(BaseStore):
def configure_add(self):
def delete(self, location, connection=None):
def set_acls(self, location, public=False, read_tenants=None, write_tenants=None, connection=None):
def create_location(self, image_id):
def get_connection(self, location):
class ChunkReader(object):
def __init__(self, fd, checksum, total):
def read(self, i):
\OpenStack\glance-2014.1\glance\store\vmware_datastore.py
def is_valid_ipv6(address):
def http_response_iterator(conn, response, size):
class _Reader(object):
def __init__(self, data, checksum):
def read(self, length):
def size(self):
class StoreLocation(glance.store.location.StoreLocation):
def process_specs(self):
def get_uri(self):
def _is_valid_path(self, path):
def parse_uri(self, uri):
class Store(glance.store.base.Store):
def get_schemes(self):
def configure(self):
def configure_add(self):
def _option_get(self, param):
def _build_vim_cookie_header(self, vim_cookies):
def add(self, image_id, image_file, image_size):
def get(self, location):
def get_size(self, location):
def delete(self, location):
def _query(self, location, method, headers, depth=0):
def _get_http_conn(self, method, loc, headers, content=None):
def _get_http_conn_class(self):
\OpenStack\glance-2014.1\glance\store\__init__.py
class BackendException(Exception):
class UnsupportedBackend(BackendException):
class Indexable(object):
def __init__(self, wrapped, size):
def __iter__(self):
def __getitem__(self, i):
def another(self):
def getvalue(self):
def __len__(self):
def _register_stores(store_classes):
def _get_store_class(store_entry):
def create_stores():
def verify_default_store():
def get_known_schemes():
def get_known_stores():
def get_store_from_scheme(context, scheme, loc=None):
def get_store_from_uri(context, uri, loc=None):
def get_from_backend(context, uri, **kwargs):
def get_size_from_backend(context, uri):
def delete_from_backend(context, uri, **kwargs):
def get_store_from_location(uri):
def safe_delete_from_backend(context, uri, image_id, **kwargs):
def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs):
def delete_image_from_backend(context, store_api, image_id, uri):
def check_location_metadata(val, key=''):
def store_add_to_backend(image_id, data, size, store):
def add_to_backend(context, scheme, image_id, data, size):
def set_acls(context, location_uri, public=False, read_tenants=[], write_tenants=[]):
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, store_api):
def _set_acls(self, image):
def add(self, image):
def save(self, image):
def _check_location_uri(context, store_api, uri):
def _check_image_location(context, store_api, location):
def _set_image_size(context, image, locations):
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, factory, context, store_api):
def new_image(self, **kwargs):
class StoreLocations(collections.MutableSequence):
def __init__(self, image_proxy, value):
def append(self, location):
def extend(self, other):
def insert(self, i, location):
def pop(self, i=-1):
def count(self, location):
def index(self, location, *args):
def remove(self, location):
def reverse(self):
def __getitem__(self, i):
def __setitem__(self, i, location):
def __delitem__(self, i):
def __delslice__(self, i, j):
def __iadd__(self, other):
def __contains__(self, location):
def __len__(self):
def __cast(self, other):
def __cmp__(self, other):
def __iter__(self):
def __copy__(self):
def __deepcopy__(self, memo):
def _locations_proxy(target, attr):
def get_attr(self):
def set_attr(self, value):
def del_attr(self):
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, store_api):
def delete(self):
def set_data(self, data, size=None):
def get_data(self):
class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, repo, image, context, store_api):
def _set_acls(self):
def add(self, member):
def remove(self, member):
\OpenStack\glance-2014.1\glance\tests\functional\db\base.py
def build_image_fixture(**kwargs):
def build_task_fixture(**kwargs):
class TestDriver(test_utils.BaseTestCase):
def setUp(self):
def build_image_fixtures(self):
def create_images(self, images):
class DriverTests(object):
def test_image_create_requires_status(self):
def test_image_create_defaults(self, mock_utcnow):
def test_image_create_duplicate_id(self):
def test_image_create_with_locations(self):
def test_image_create_with_location_data(self):
def test_image_create_properties(self):
def test_image_create_unknown_attribtues(self):
def test_image_update_core_attribute(self):
def test_image_update_with_locations(self):
def test_image_update_with_location_data(self):
def test_image_update(self):
def test_image_update_properties(self):
def test_image_update_purge_properties(self):
def test_image_property_delete(self):
def test_image_get(self):
def test_image_get_disallow_deleted(self):
def test_image_get_allow_deleted(self):
def test_image_get_force_allow_deleted(self):
def test_image_get_not_owned(self):
def test_image_get_not_found(self):
def test_image_get_all(self):
def test_image_get_all_with_filter(self):
def test_image_get_all_with_filter_user_defined_property(self):
def test_image_get_all_with_filter_nonexistent_userdef_property(self):
def test_image_get_all_with_filter_userdef_prop_nonexistent_value(self):
def test_image_get_all_with_filter_multiple_user_defined_properties(self):
def test_image_get_all_with_filter_nonexistent_user_defined_property(self):
def test_image_get_all_with_filter_user_deleted_property(self):
def test_image_get_all_with_filter_undefined_property(self):
def test_image_get_all_size_min_max(self):
def test_image_get_all_size_min(self):
def test_image_get_all_size_range(self):
def test_image_get_all_size_max(self):
def test_image_get_all_with_filter_min_range_bad_value(self):
def test_image_get_all_with_filter_max_range_bad_value(self):
def test_image_get_all_marker(self):
def test_image_get_all_marker_deleted(self):
def test_image_get_all_marker_deleted_showing_deleted_as_admin(self):
def test_image_get_all_marker_deleted_showing_deleted(self):
def test_image_get_all_marker_null_name_desc(self):
def test_image_get_all_marker_null_disk_format_desc(self):
def test_image_get_all_marker_null_container_format_desc(self):
def test_image_get_all_marker_null_name_asc(self):
def test_image_get_all_marker_null_disk_format_asc(self):
def test_image_get_all_marker_null_container_format_asc(self):
def test_image_get_all_limit(self):
def test_image_get_all_owned(self):
def test_image_get_all_owned_checksum(self):
def test_image_get_all_with_filter_tags(self):
def test_image_get_all_with_filter_multi_tags(self):
def test_image_get_all_with_filter_tags_and_nonexistent(self):
def test_image_get_all_with_filter_deleted_tags(self):
def test_image_get_all_with_filter_undefined_tags(self):
def test_image_paginate(self):
def test_image_get_all_invalid_sort_key(self):
def test_image_get_all_limit_marker(self):
def test_image_destroy(self):
def test_image_destroy_with_delete_all(self):
def _create_image_with_child_entries():
def test_image_get_multiple_members(self):
def test_is_image_visible(self):
def test_image_tag_create(self):
def test_image_tag_set_all(self):
def test_image_tag_get_all(self):
def test_image_tag_get_all_no_tags(self):
def test_image_tag_get_all_non_existent_image(self):
def test_image_tag_delete(self):
def test_image_member_create(self, mock_utcnow):
def test_image_member_update(self):
def test_image_member_update_status(self):
def test_image_member_find(self):
def _simplify(output):
def _assertMemberListMatch(list1, list2):
def test_image_member_count(self):
def test_image_member_count_invalid_image_id(self):
def test_image_member_count_empty_image_id(self):
def test_image_member_delete(self):
class DriverQuotaTests(test_utils.BaseTestCase):
def setUp(self):
def test_storage_quota(self):
def test_storage_quota_without_image_id(self):
def test_storage_quota_multiple_locations(self):
def test_storage_quota_deleted_image(self):
class TaskTests(test_utils.BaseTestCase):
def setUp(self):
def build_task_fixtures(self):
def test_task_get_all_with_filter(self):
def test_task_get_all_as_admin(self):
def test_task_get_all_marker(self):
def test_task_get_all_limit(self):
def test_task_get_all_owned(self):
def test_task_get(self):
def test_task_get_all(self):
def test_task_create(self):
def test_task_create_with_all_task_info_null(self):
def test_task_update(self):
def test_task_update_with_all_task_info_null(self):
def test_task_delete(self):
def test_task_delete_as_admin(self):
class TestVisibility(test_utils.BaseTestCase):
def setUp(self):
def setup_tenants(self):
def setup_contexts(self):
def build_image_fixtures(self):
def create_images(self, images):
class VisibilityTests(object):
def test_unknown_admin_sees_all(self):
def test_unknown_admin_is_public_true(self):
def test_unknown_admin_is_public_false(self):
def test_unknown_admin_is_public_none(self):
def test_unknown_admin_visibility_public(self):
def test_unknown_admin_visibility_private(self):
def test_known_admin_sees_all(self):
def test_known_admin_is_public_true(self):
def test_known_admin_is_public_false(self):
def test_known_admin_is_public_none(self):
def test_admin_as_user_true(self):
def test_known_admin_visibility_public(self):
def test_known_admin_visibility_private(self):
def test_what_unknown_user_sees(self):
def test_unknown_user_is_public_true(self):
def test_unknown_user_is_public_false(self):
def test_unknown_user_is_public_none(self):
def test_unknown_user_visibility_public(self):
def test_unknown_user_visibility_private(self):
def test_what_tenant1_sees(self):
def test_tenant1_is_public_true(self):
def test_tenant1_is_public_false(self):
def test_tenant1_is_public_none(self):
def test_tenant1_visibility_public(self):
def test_tenant1_visibility_private(self):
def _setup_is_public_red_herring(self):
def test_is_public_is_a_normal_filter_for_admin(self):
def test_is_public_is_a_normal_filter_for_user(self):
def test_admin_is_public_true_and_visibility_public(self):
def test_admin_is_public_false_and_visibility_public(self):
def test_admin_is_public_true_and_visibility_private(self):
def test_admin_is_public_false_and_visibility_private(self):
def test_tenant1_is_public_true_and_visibility_public(self):
def test_tenant1_is_public_false_and_visibility_public(self):
def test_tenant1_is_public_true_and_visibility_private(self):
def test_tenant1_is_public_false_and_visibility_private(self):
class TestMembershipVisibility(test_utils.BaseTestCase):
def setUp(self):
def _create_contexts(self):
def _user_fixture(self, admin=False):
def _create_images(self):
def _create_image(self, name, owner, members=None):
class MembershipVisibilityTests(object):
def _check_by_member(self, ctx, member_id, expected):
def test_owner1_finding_user1_memberships(self):
def test_user1_finding_user1_memberships(self):
def test_user2_finding_user1_memberships(self):
def test_admin_finding_user1_memberships(self):
def _check_by_image(self, context, image_id, expected):
def test_owner1_finding_owner1s_image_members(self):
def test_admin_finding_owner1s_image_members(self):
def test_user1_finding_owner1s_image_members(self):
def test_user2_finding_owner1s_image_members(self):
def test_user3_finding_owner1s_image_members(self):
\OpenStack\glance-2014.1\glance\tests\functional\db\test_registry.py
def get_db(config):
def reset_db(db_api):
class FunctionalInitWrapper(functional.FunctionalTest):
def setUp(self):
class TestRegistryDriver(base.TestDriver,
base.DriverTests,
FunctionalInitWrapper):
def setUp(self):
def tearDown(self):
class TestRegistryQuota(base.DriverQuotaTests, FunctionalInitWrapper):
def setUp(self):
def tearDown(self):
\OpenStack\glance-2014.1\glance\tests\functional\db\test_rpc_endpoint.py
class TestRegistryURLVisibility(functional.FunctionalTest):
def setUp(self):
def _url(self, path):
def _headers(self, custom_headers=None):
def test_v2_not_enabled(self):
def test_v2_enabled(self):
\OpenStack\glance-2014.1\glance\tests\functional\db\test_simple.py
def get_db(config):
def reset_db(db_api):
class TestSimpleDriver(base.TestDriver, base.DriverTests):
def setUp(self):
class TestSimpleQuota(base.DriverQuotaTests):
def setUp(self):
class TestSimpleVisibility(base.TestVisibility, base.VisibilityTests):
def setUp(self):
class TestSimpleMembershipVisibility(base.TestMembershipVisibility,
base.MembershipVisibilityTests):
def setUp(self):
class TestSimpleTask(base.TaskTests):
def setUp(self):
\OpenStack\glance-2014.1\glance\tests\functional\db\test_sqlalchemy.py
def get_db(config):
def reset_db(db_api):
class TestSqlAlchemyDriver(base.TestDriver, base.DriverTests):
def setUp(self):
def test_get_image_with_invalid_long_image_id(self):
def test_image_tag_delete_with_invalid_long_image_id(self):
def test_image_tag_get_all_with_invalid_long_image_id(self):
def test_user_get_storage_usage_with_invalid_long_image_id(self):
class TestSqlAlchemyVisibility(base.TestVisibility, base.VisibilityTests):
def setUp(self):
class TestSqlAlchemyMembershipVisibility(base.TestMembershipVisibility,
base.MembershipVisibilityTests):
def setUp(self):
class TestSqlAlchemyDBDataIntegrity(base.TestDriver):
def setUp(self):
def test_paginate_redundant_sort_keys(self):
def fake_paginate_query(query, model, limit, sort_keys, marker, sort_dir):
def test_paginate_non_redundant_sort_keys(self):
def fake_paginate_query(query, model, limit, sort_keys, marker, sort_dir):
class TestSqlAlchemyTask(base.TaskTests):
def setUp(self):
class TestSqlAlchemyQuota(base.DriverQuotaTests):
def setUp(self):
\OpenStack\glance-2014.1\glance\tests\functional\db\__init__.py
def load(get_db_fn, reset_db_fn):
def reset():
\OpenStack\glance-2014.1\glance\tests\functional\store\test_cinder.py
def parse_config(config):
class TestCinderStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_filesystem.py
class TestFilesystemStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_gridfs.py
def read_config(path):
def parse_config(config):
class TestGridfsStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_http.py
def get_handler_class(fixture):
def http_server(image_id, image_data):
class TestHTTPStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def tearDown(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_rbd.py
def read_config(path):
def parse_config(config):
class TestRBDStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def tearDown(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
def test_unicode(self):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_s3.py
def read_config(path):
def parse_config(config):
def s3_connect(s3_host, access_key, secret_key, calling_format):
def s3_put_object(s3_client, bucket_name, object_name, contents):
class TestS3Store(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_sheepdog.py
class TestSheepdogStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_swift.py
class SwiftStoreError(RuntimeError):
def _uniq(value):
def read_config(path):
def parse_config(config):
def swift_connect(auth_url, auth_version, user, key):
def swift_list_containers(swift_conn):
def swift_create_container(swift_conn, container_name):
def swift_get_container(swift_conn, container_name, **kwargs):
def swift_delete_container(swift_conn, container_name):
def swift_put_object(swift_conn, container_name, object_name, contents):
def swift_head_object(swift_conn, container_name, obj_name):
def keystone_authenticate(auth_url, auth_version, tenant_name, username, password):
class TestSwiftStore(store_tests.BaseTestCase, testtools.TestCase):
def setUp(self):
def get_store(self, **kwargs):
def test_object_chunking(self):
def test_retries_fail_start_of_download(self):
def iter_wrapper(iterable):
def test_retries_fail_partway_through_download(self):
def iter_wrapper(iterable):
def test_retries_fail_end_of_download(self):
def iter_wrapper(iterable):
def stash_image(self, image_id, image_data):
def test_multitenant(self):
def test_delayed_delete_with_auth(self):
\OpenStack\glance-2014.1\glance\tests\functional\store\test_vmware_datastore.py
def read_config(path):
def parse_config(config):
class VMwareDatastoreStoreError(RuntimeError):
def vsphere_connect(server_ip, server_username, server_password, api_retry_count, task_poll_interval, scheme='https', create_session=True, wsdl_loc=None):
class TestVMwareDatastoreStore(store_tests.BaseTestCase, testtools.TestCase):
def _build_vim_cookie_header(self, vim_cookies):
def setUp(self):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
\OpenStack\glance-2014.1\glance\tests\functional\store\__init__.py
class BaseTestCase(object):
def setUp(self):
def tearDown(self):
def config(self, **kw):
def get_store(self, **kwargs):
def stash_image(self, image_id, image_data):
def test_create_store(self):
def test_lifecycle(self):
def test_get_remote_image(self):
\OpenStack\glance-2014.1\glance\tests\functional\store_utils.py
class RemoteImageHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_HEAD(self):
def do_GET(self):
def log_message(self, format, *args):
def setup_http(test):
def serve_requests(httpd):
def get_http_uri(test, image_id):
\OpenStack\glance-2014.1\glance\tests\functional\test_api.py
class TestRootApi(functional.FunctionalTest):
def test_version_configurations(self):
def test_version_variations(self):
\OpenStack\glance-2014.1\glance\tests\functional\test_bin_glance_cache_manage.py
class TestBinGlanceCacheManage(functional.FunctionalTest):
def setUp(self):
def add_image(self, name):
def is_image_cached(self, image_id):
def iso_date(self, image_id):
def test_no_cache_enabled(self):
def test_cache_index(self):
def test_queue(self):
\OpenStack\glance-2014.1\glance\tests\functional\test_cache_middleware.py
OpenStack IndexPreviousNext
|