¡@

Home 

OpenStack Study: backend.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2010-2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

"""

Pluggable Back-end for Account Server

"""

import os

from uuid import uuid4

import time

import cPickle as pickle

import errno

import sqlite3

from swift.common.utils import normalize_timestamp, lock_parent_directory

from swift.common.db import DatabaseBroker, DatabaseConnectionError, \

PENDING_CAP, PICKLE_PROTOCOL, utf8encode

DATADIR = 'accounts'

**** CubicPower OpenStack Study ****

class AccountBroker(DatabaseBroker):

"""Encapsulates working with an account database."""

db_type = 'account'

db_contains_type = 'container'

db_reclaim_timestamp = 'delete_timestamp'

**** CubicPower OpenStack Study ****

    def _initialize(self, conn, put_timestamp):

        """

        Create a brand new account database (tables, indices, triggers, etc.)

        :param conn: DB connection object

        :param put_timestamp: put timestamp

        """

        if not self.account:

            raise ValueError(

                'Attempting to create a new database with no account set')

        self.create_container_table(conn)

        self.create_account_stat_table(conn, put_timestamp)

**** CubicPower OpenStack Study ****

    def create_container_table(self, conn):

        """

        Create container table which is specific to the account DB.

        :param conn: DB connection object

        """

        conn.executescript("""

            CREATE TABLE container (

                ROWID INTEGER PRIMARY KEY AUTOINCREMENT,

                name TEXT,

                put_timestamp TEXT,

                delete_timestamp TEXT,

                object_count INTEGER,

                bytes_used INTEGER,

                deleted INTEGER DEFAULT 0

            );

            CREATE INDEX ix_container_deleted_name ON

                container (deleted, name);

            CREATE TRIGGER container_insert AFTER INSERT ON container

            BEGIN

                UPDATE account_stat

                SET container_count = container_count + (1 - new.deleted),

                    object_count = object_count + new.object_count,

                    bytes_used = bytes_used + new.bytes_used,

                    hash = chexor(hash, new.name,

                                  new.put_timestamp || '-' ||

                                    new.delete_timestamp || '-' ||

                                    new.object_count || '-' || new.bytes_used);

            END;

            CREATE TRIGGER container_update BEFORE UPDATE ON container

            BEGIN

                SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');

            END;

            CREATE TRIGGER container_delete AFTER DELETE ON container

            BEGIN

                UPDATE account_stat

                SET container_count = container_count - (1 - old.deleted),

                    object_count = object_count - old.object_count,

                    bytes_used = bytes_used - old.bytes_used,

                    hash = chexor(hash, old.name,

                                  old.put_timestamp || '-' ||

                                    old.delete_timestamp || '-' ||

                                    old.object_count || '-' || old.bytes_used);

            END;

        """)

**** CubicPower OpenStack Study ****

    def create_account_stat_table(self, conn, put_timestamp):

        """

        Create account_stat table which is specific to the account DB.

        Not a part of Pluggable Back-ends, internal to the baseline code.

        :param conn: DB connection object

        :param put_timestamp: put timestamp

        """

        conn.executescript("""

            CREATE TABLE account_stat (

                account TEXT,

                created_at TEXT,

                put_timestamp TEXT DEFAULT '0',

                delete_timestamp TEXT DEFAULT '0',

                container_count INTEGER,

                object_count INTEGER DEFAULT 0,

                bytes_used INTEGER DEFAULT 0,

                hash TEXT default '00000000000000000000000000000000',

                id TEXT,

                status TEXT DEFAULT '',

                status_changed_at TEXT DEFAULT '0',

                metadata TEXT DEFAULT ''

            );

            INSERT INTO account_stat (container_count) VALUES (0);

        """)

        conn.execute('''

            UPDATE account_stat SET account = ?, created_at = ?, id = ?,

                   put_timestamp = ?

            ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),

                  put_timestamp))

**** CubicPower OpenStack Study ****

    def get_db_version(self, conn):

        if self._db_version == -1:

            self._db_version = 0

            for row in conn.execute('''

                    SELECT name FROM sqlite_master

                    WHERE name = 'ix_container_deleted_name' '''):

                self._db_version = 1

        return self._db_version

**** CubicPower OpenStack Study ****

    def _delete_db(self, conn, timestamp, force=False):

        """

        Mark the DB as deleted.

        :param conn: DB connection object

        :param timestamp: timestamp to mark as deleted

        """

        conn.execute("""

            UPDATE account_stat

            SET delete_timestamp = ?,

                status = 'DELETED',

                status_changed_at = ?

            WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))

**** CubicPower OpenStack Study ****

    def _commit_puts_load(self, item_list, entry):

        """See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""

        (name, put_timestamp, delete_timestamp,

         object_count, bytes_used, deleted) = \

            pickle.loads(entry.decode('base64'))

        item_list.append(

            {'name': name,

             'put_timestamp': put_timestamp,

             'delete_timestamp': delete_timestamp,

             'object_count': object_count,

             'bytes_used': bytes_used,

             'deleted': deleted})

**** CubicPower OpenStack Study ****

    def empty(self):

        """

        Check if the account DB is empty.

        :returns: True if the database has no active containers.

        """

        self._commit_puts_stale_ok()

        with self.get() as conn:

            row = conn.execute(

                'SELECT container_count from account_stat').fetchone()

            return (row[0] == 0)

**** CubicPower OpenStack Study ****

    def put_container(self, name, put_timestamp, delete_timestamp,

                      object_count, bytes_used):

        """

        Create a container with the given attributes.

        :param name: name of the container to create

        :param put_timestamp: put_timestamp of the container to create

        :param delete_timestamp: delete_timestamp of the container to create

        :param object_count: number of objects in the container

        :param bytes_used: number of bytes used by the container

        """

        if delete_timestamp > put_timestamp and \

                object_count in (None, '', 0, '0'):

            deleted = 1

        else:

            deleted = 0

        record = {'name': name, 'put_timestamp': put_timestamp,

                  'delete_timestamp': delete_timestamp,

                  'object_count': object_count,

                  'bytes_used': bytes_used,

                  'deleted': deleted}

        if self.db_file == ':memory:':

            self.merge_items([record])

            return

        if not os.path.exists(self.db_file):

            raise DatabaseConnectionError(self.db_file, "DB doesn't exist")

        pending_size = 0

        try:

            pending_size = os.path.getsize(self.pending_file)

        except OSError as err:

            if err.errno != errno.ENOENT:

                raise

        if pending_size > PENDING_CAP:

            self._commit_puts([record])

        else:

            with lock_parent_directory(self.pending_file,

                                       self.pending_timeout):

                with open(self.pending_file, 'a+b') as fp:

                    # Colons aren't used in base64 encoding; so they are our

                    # delimiter

                    fp.write(':')

                    fp.write(pickle.dumps(

                        (name, put_timestamp, delete_timestamp, object_count,

                         bytes_used, deleted),

                        protocol=PICKLE_PROTOCOL).encode('base64'))

                    fp.flush()

**** CubicPower OpenStack Study ****

    def is_deleted(self):

        """

        Check if the account DB is considered to be deleted.

        :returns: True if the account DB is considered to be deleted, False

                  otherwise

        """

        if self.db_file != ':memory:' and not os.path.exists(self.db_file):

            return True

        self._commit_puts_stale_ok()

        with self.get() as conn:

            row = conn.execute('''

                SELECT put_timestamp, delete_timestamp, container_count, status

                FROM account_stat''').fetchone()

            return row['status'] == 'DELETED' or (

                row['container_count'] in (None, '', 0, '0') and

                row['delete_timestamp'] > row['put_timestamp'])

**** CubicPower OpenStack Study ****

    def is_status_deleted(self):

        """Only returns true if the status field is set to DELETED."""

        with self.get() as conn:

            row = conn.execute('''

                SELECT status

                FROM account_stat''').fetchone()

            return (row['status'] == "DELETED")

**** CubicPower OpenStack Study ****

    def get_info(self):

        """

        Get global data for the account.

        :returns: dict with keys: account, created_at, put_timestamp,

                  delete_timestamp, container_count, object_count,

                  bytes_used, hash, id

        """

        self._commit_puts_stale_ok()

        with self.get() as conn:

            return dict(conn.execute('''

                SELECT account, created_at,  put_timestamp, delete_timestamp,

                       container_count, object_count, bytes_used, hash, id

                FROM account_stat

            ''').fetchone())

**** CubicPower OpenStack Study ****

    def list_containers_iter(self, limit, marker, end_marker, prefix,

                             delimiter):

        """

        Get a list of containers sorted by name starting at marker onward, up

        to limit entries. Entries will begin with the prefix and will not have

        the delimiter after the prefix.

        :param limit: maximum number of entries to get

        :param marker: marker query

        :param end_marker: end marker query

        :param prefix: prefix query

        :param delimiter: delimiter for query

        :returns: list of tuples of (name, object_count, bytes_used, 0)

        """

        (marker, end_marker, prefix, delimiter) = utf8encode(

            marker, end_marker, prefix, delimiter)

        self._commit_puts_stale_ok()

        if delimiter and not prefix:

            prefix = ''

        orig_marker = marker

        with self.get() as conn:

            results = []

            while len(results) < limit:

                query = """

                    SELECT name, object_count, bytes_used, 0

                    FROM container

                    WHERE deleted = 0 AND """

                query_args = []

                if end_marker:

                    query += ' name < ? AND'

                    query_args.append(end_marker)

                if marker and marker >= prefix:

                    query += ' name > ? AND'

                    query_args.append(marker)

                elif prefix:

                    query += ' name >= ? AND'

                    query_args.append(prefix)

                if self.get_db_version(conn) < 1:

                    query += ' +deleted = 0'

                else:

                    query += ' deleted = 0'

                query += ' ORDER BY name LIMIT ?'

                query_args.append(limit - len(results))

                curs = conn.execute(query, query_args)

                curs.row_factory = None

                if prefix is None:

                    # A delimiter without a specified prefix is ignored

                    return [r for r in curs]

                if not delimiter:

                    if not prefix:

                        # It is possible to have a delimiter but no prefix

                        # specified. As above, the prefix will be set to the

                        # empty string, so avoid performing the extra work to

                        # check against an empty prefix.

                        return [r for r in curs]

                    else:

                        return [r for r in curs if r[0].startswith(prefix)]

                # We have a delimiter and a prefix (possibly empty string) to

                # handle

                rowcount = 0

                for row in curs:

                    rowcount += 1

                    marker = name = row[0]

                    if len(results) >= limit or not name.startswith(prefix):

                        curs.close()

                        return results

                    end = name.find(delimiter, len(prefix))

                    if end > 0:

                        marker = name[:end] + chr(ord(delimiter) + 1)

                        dir_name = name[:end + 1]

                        if dir_name != orig_marker:

                            results.append([dir_name, 0, 0, 1])

                        curs.close()

                        break

                    results.append(row)

                if not rowcount:

                    break

            return results

**** CubicPower OpenStack Study ****

    def merge_items(self, item_list, source=None):

        """

        Merge items into the container table.

        :param item_list: list of dictionaries of {'name', 'put_timestamp',

                          'delete_timestamp', 'object_count', 'bytes_used',

                          'deleted'}

        :param source: if defined, update incoming_sync with the source

        """

        with self.get() as conn:

            max_rowid = -1

            for rec in item_list:

                record = [rec['name'], rec['put_timestamp'],

                          rec['delete_timestamp'], rec['object_count'],

                          rec['bytes_used'], rec['deleted']]

                query = '''

                    SELECT name, put_timestamp, delete_timestamp,

                           object_count, bytes_used, deleted

                    FROM container WHERE name = ?

                '''

                if self.get_db_version(conn) >= 1:

                    query += ' AND deleted IN (0, 1)'

                curs = conn.execute(query, (rec['name'],))

                curs.row_factory = None

                row = curs.fetchone()

                if row:

                    row = list(row)

                    for i in xrange(5):

                        if record[i] is None and row[i] is not None:

                            record[i] = row[i]

                    if row[1] > record[1]:  # Keep newest put_timestamp

                        record[1] = row[1]

                    if row[2] > record[2]:  # Keep newest delete_timestamp

                        record[2] = row[2]

                    # If deleted, mark as such

                    if record[2] > record[1] and \

                            record[3] in (None, '', 0, '0'):

                        record[5] = 1

                    else:

                        record[5] = 0

                conn.execute('''

                    DELETE FROM container WHERE name = ? AND

                                                deleted IN (0, 1)

                ''', (record[0],))

                conn.execute('''

                    INSERT INTO container (name, put_timestamp,

                        delete_timestamp, object_count, bytes_used,

                        deleted)

                    VALUES (?, ?, ?, ?, ?, ?)

                ''', record)

                if source:

                    max_rowid = max(max_rowid, rec['ROWID'])

            if source:

                try:

                    conn.execute('''

                        INSERT INTO incoming_sync (sync_point, remote_id)

                        VALUES (?, ?)

                    ''', (max_rowid, source))

                except sqlite3.IntegrityError:

                    conn.execute('''

                        UPDATE incoming_sync SET sync_point=max(?, sync_point)

                        WHERE remote_id=?

                    ''', (max_rowid, source))

            conn.commit()