¡@

Home 

OpenStack Study: host_manager.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2011 OpenStack Foundation

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""

Manage hosts in the current zone.

"""

import UserDict

from oslo.config import cfg

from cinder import db

from cinder import exception

from cinder.openstack.common import log as logging

from cinder.openstack.common.scheduler import filters

from cinder.openstack.common.scheduler import weights

from cinder.openstack.common import timeutils

from cinder import utils

host_manager_opts = [

cfg.ListOpt('scheduler_default_filters',

default=[

'AvailabilityZoneFilter',

'CapacityFilter',

'CapabilitiesFilter'

],

help='Which filter class names to use for filtering hosts '

'when not specified in the request.'),

cfg.ListOpt('scheduler_default_weighers',

default=[

'CapacityWeigher'

],

help='Which weigher class names to use for weighing hosts.')

]

CONF = cfg.CONF

CONF.register_opts(host_manager_opts)

CONF.import_opt('scheduler_driver', 'cinder.scheduler.manager')

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class ReadOnlyDict(UserDict.IterableUserDict):

"""A read-only dict."""

**** CubicPower OpenStack Study ****

    def __init__(self, source=None):

        self.data = {}

        self.update(source)

**** CubicPower OpenStack Study ****

    def __setitem__(self, key, item):

        raise TypeError

**** CubicPower OpenStack Study ****

    def __delitem__(self, key):

        raise TypeError

**** CubicPower OpenStack Study ****

    def clear(self):

        raise TypeError

**** CubicPower OpenStack Study ****

    def pop(self, key, *args):

        raise TypeError

**** CubicPower OpenStack Study ****

    def popitem(self):

        raise TypeError

**** CubicPower OpenStack Study ****

    def update(self, source=None):

        if source is None:

            return

        elif isinstance(source, UserDict.UserDict):

            self.data = source.data

        elif isinstance(source, type({})):

            self.data = source

        else:

            raise TypeError

**** CubicPower OpenStack Study ****

class HostState(object):

"""Mutable and immutable information tracked for a host."""

**** CubicPower OpenStack Study ****

    def __init__(self, host, capabilities=None, service=None):

        self.host = host

        self.update_capabilities(capabilities, service)

        self.volume_backend_name = None

        self.vendor_name = None

        self.driver_version = 0

        self.storage_protocol = None

        self.QoS_support = False

        # Mutable available resources.

        # These will change as resources are virtually "consumed".

        self.total_capacity_gb = 0

        # capacity has been allocated in cinder POV, which should be

        # sum(vol['size'] for vol in vols_on_hosts)

        self.allocated_capacity_gb = 0

        self.free_capacity_gb = None

        self.reserved_percentage = 0

        self.updated = None

**** CubicPower OpenStack Study ****

    def update_capabilities(self, capabilities=None, service=None):

        # Read-only capability dicts

        if capabilities is None:

            capabilities = {}

        self.capabilities = ReadOnlyDict(capabilities)

        if service is None:

            service = {}

        self.service = ReadOnlyDict(service)

**** CubicPower OpenStack Study ****

    def update_from_volume_capability(self, capability):

        """Update information about a host from its volume_node info."""

        if capability:

            if self.updated and self.updated > capability['timestamp']:

                return

            self.volume_backend = capability.get('volume_backend_name', None)

            self.vendor_name = capability.get('vendor_name', None)

            self.driver_version = capability.get('driver_version', None)

            self.storage_protocol = capability.get('storage_protocol', None)

            self.QoS_support = capability.get('QoS_support', False)

            self.total_capacity_gb = capability['total_capacity_gb']

            self.free_capacity_gb = capability['free_capacity_gb']

            self.allocated_capacity_gb = capability.get(

                'allocated_capacity_gb', 0)

            self.reserved_percentage = capability['reserved_percentage']

            self.updated = capability['timestamp']

**** CubicPower OpenStack Study ****

    def consume_from_volume(self, volume):

        """Incrementally update host state from an volume."""

        volume_gb = volume['size']

        self.allocated_capacity_gb += volume_gb

        if self.free_capacity_gb == 'infinite':

            # There's virtually infinite space on back-end

            pass

        elif self.free_capacity_gb == 'unknown':

            # Unable to determine the actual free space on back-end

            pass

        else:

            self.free_capacity_gb -= volume_gb

        self.updated = timeutils.utcnow()

**** CubicPower OpenStack Study ****

    def __repr__(self):

        return ("host '%s': free_capacity_gb: %s" %

                (self.host, self.free_capacity_gb))

**** CubicPower OpenStack Study ****

class HostManager(object):

"""Base HostManager class."""

host_state_cls = HostState

**** CubicPower OpenStack Study ****

    def __init__(self):

        self.service_states = {}  # { : {: {cap k : v}}}

        self.host_state_map = {}

        self.filter_handler = filters.HostFilterHandler('cinder.scheduler.'

                                                        'filters')

        self.filter_classes = self.filter_handler.get_all_classes()

        self.weight_handler = weights.HostWeightHandler('cinder.scheduler.'

                                                        'weights')

        self.weight_classes = self.weight_handler.get_all_classes()

        default_filters = ['AvailabilityZoneFilter',

                           'CapacityFilter',

                           'CapabilitiesFilter']

        chance = 'cinder.scheduler.chance.ChanceScheduler'

        simple = 'cinder.scheduler.simple.SimpleScheduler'

        if CONF.scheduler_driver == simple:

            CONF.set_override('scheduler_default_filters', default_filters)

            CONF.set_override('scheduler_default_weighers',

                              ['AllocatedCapacityWeigher'])

        elif CONF.scheduler_driver == chance:

            CONF.set_override('scheduler_default_filters', default_filters)

            CONF.set_override('scheduler_default_weighers',

                              ['ChanceWeigher'])

        else:

            # Do nothing when some other scheduler is configured

            pass

**** CubicPower OpenStack Study ****

    def _choose_host_filters(self, filter_cls_names):

        """Return a list of available filter names.

        This function checks input filter names against a predefined set

        of acceptable filterss (all loaded filters).  If input is None,

        it uses CONF.scheduler_default_filters instead.

        """

        if filter_cls_names is None:

            filter_cls_names = CONF.scheduler_default_filters

        if not isinstance(filter_cls_names, (list, tuple)):

            filter_cls_names = [filter_cls_names]

        good_filters = []

        bad_filters = []

        for filter_name in filter_cls_names:

            found_class = False

            for cls in self.filter_classes:

                if cls.__name__ == filter_name:

                    found_class = True

                    good_filters.append(cls)

                    break

            if not found_class:

                bad_filters.append(filter_name)

        if bad_filters:

            msg = ", ".join(bad_filters)

            raise exception.SchedulerHostFilterNotFound(filter_name=msg)

        return good_filters

**** CubicPower OpenStack Study ****

    def _choose_host_weighers(self, weight_cls_names):

        """Return a list of available weigher names.

        This function checks input weigher names against a predefined set

        of acceptable weighers (all loaded weighers).  If input is None,

        it uses CONF.scheduler_default_weighers instead.

        """

        if weight_cls_names is None:

            weight_cls_names = CONF.scheduler_default_weighers

        if not isinstance(weight_cls_names, (list, tuple)):

            weight_cls_names = [weight_cls_names]

        good_weighers = []

        bad_weighers = []

        for weigher_name in weight_cls_names:

            found_class = False

            for cls in self.weight_classes:

                if cls.__name__ == weigher_name:

                    good_weighers.append(cls)

                    found_class = True

                    break

            if not found_class:

                bad_weighers.append(weigher_name)

        if bad_weighers:

            msg = ", ".join(bad_weighers)

            raise exception.SchedulerHostWeigherNotFound(weigher_name=msg)

        return good_weighers

**** CubicPower OpenStack Study ****

    def get_filtered_hosts(self, hosts, filter_properties,

                           filter_class_names=None):

        """Filter hosts and return only ones passing all filters."""

        filter_classes = self._choose_host_filters(filter_class_names)

        return self.filter_handler.get_filtered_objects(filter_classes,

                                                        hosts,

                                                        filter_properties)

**** CubicPower OpenStack Study ****

    def get_weighed_hosts(self, hosts, weight_properties,

                          weigher_class_names=None):

        """Weigh the hosts."""

        weigher_classes = self._choose_host_weighers(weigher_class_names)

        return self.weight_handler.get_weighed_objects(weigher_classes,

                                                       hosts,

                                                       weight_properties)

**** CubicPower OpenStack Study ****

    def update_service_capabilities(self, service_name, host, capabilities):

        """Update the per-service capabilities based on this notification."""

        if service_name != 'volume':

            LOG.debug(_('Ignoring %(service_name)s service update '

                        'from %(host)s'),

                      {'service_name': service_name, 'host': host})

            return

        LOG.debug(_("Received %(service_name)s service update from "

                    "%(host)s.") %

                  {'service_name': service_name, 'host': host})

        # Copy the capabilities, so we don't modify the original dict

        capab_copy = dict(capabilities)

        capab_copy["timestamp"] = timeutils.utcnow()  # Reported time

        self.service_states[host] = capab_copy

**** CubicPower OpenStack Study ****

    def get_all_host_states(self, context):

        """Returns a dict of all the hosts the HostManager knows about.

        Each of the consumable resources in HostState are

        populated with capabilities scheduler received from RPC.

        For example:

          {'192.168.1.100': HostState(), ...}

        """

        # Get resource usage across the available volume nodes:

        topic = CONF.volume_topic

        volume_services = db.service_get_all_by_topic(context, topic)

        active_hosts = set()

        for service in volume_services:

            host = service['host']

            if not utils.service_is_up(service) or service['disabled']:

                LOG.warn(_("volume service is down or disabled. "

                           "(host: %s)") % host)

                continue

            capabilities = self.service_states.get(host, None)

            host_state = self.host_state_map.get(host)

            if host_state:

                # copy capabilities to host_state.capabilities

                host_state.update_capabilities(capabilities,

                                               dict(service.iteritems()))

            else:

                host_state = self.host_state_cls(host,

                                                 capabilities=capabilities,

                                                 service=

                                                 dict(service.iteritems()))

                self.host_state_map[host] = host_state

            # update attributes in host_state that scheduler is interested in

            host_state.update_from_volume_capability(capabilities)

            active_hosts.add(host)

        # remove non-active hosts from host_state_map

        nonactive_hosts = set(self.host_state_map.keys()) - active_hosts

        for host in nonactive_hosts:

            LOG.info(_("Removing non-active host: %(host)s from "

                       "scheduler cache.") % {'host': host})

            del self.host_state_map[host]

        return self.host_state_map.itervalues()