

OpenStack Study: ssc_utils.py

OpenStack Index

# Copyright (c) 2012 NetApp, Inc.

# Copyright (c) 2012 OpenStack Foundation

# All Rights Reserved.


# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at


# http://www.apache.org/licenses/LICENSE-2.0


# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.


Storage service catalog utility functions and classes for NetApp systems.


import copy

from threading import Timer

from cinder import exception

from cinder.openstack.common import log as logging

from cinder.openstack.common import timeutils

from cinder import utils

from cinder.volume import driver

from cinder.volume.drivers.netapp import api

from cinder.volume.drivers.netapp import utils as na_utils

LOG = logging.getLogger(__name__)

class NetAppVolume(object):

"""Represents a NetApp volume.

Present attributes

id - name, vserver, junction_path, type

aggr - name, raid_type, ha_policy, disk_type

sis - dedup, compression

state - status, vserver_root, cluster_volume,

inconsistent, invalid, junction_active

qos - qos_policy_group

space - space-guarantee-enabled, space-guarantee,

thin_provisioned, size_avl_bytes, size_total_bytes

mirror - mirrored i.e. dp mirror

export - path


    def __init__(self, name, vserver=None):

        self.id = {}

        self.aggr = {}

        self.sis = {}

        self.state = {}

        self.qos = {}

        self.space = {}

        self.mirror = {}

        self.export = {}

        self.id['name'] = name

        self.id['vserver'] = vserver

    def __eq__(self, other):

        """Checks for equality."""

        if (self.id['name'] == other.id['name'] and

                self.id['vserver'] == other.id['vserver']):

            return True

    def __hash__(self):

        """Computes hash for the object."""

        return hash(self.id['name'])

    def __cmp__(self, other):

        """Implements comparison logic for volumes."""

        self_size_avl = self.space.get('size_avl_bytes')

        other_size_avl = other.space.get('size_avl_bytes')

        if self_size_avl is None and other_size_avl is not None:

            return -1

        elif self_size_avl is not None and other_size_avl is None:

            return 1

        elif self_size_avl is None and other_size_avl is None:

            return 0

        elif int(self_size_avl) < int(other_size_avl):

            return -1

        elif int(self_size_avl) > int(other_size_avl):

            return 1


            return 0

    def __str__(self):

        """Returns human readable form for object."""

        vol_str = "NetApp Volume id: %s, aggr: %s,"\

            " space: %s, sis: %s, state: %s, qos: %s"\

            % (self.id, self.aggr, self.space, self.sis, self.state, self.qos)

        return vol_str

def get_cluster_vols_with_ssc(na_server, vserver, volume=None):

    """Gets ssc vols for cluster vserver."""

    volumes = query_cluster_vols_for_ssc(na_server, vserver, volume)

    sis_vols = get_sis_vol_dict(na_server, vserver, volume)

    mirrored_vols = get_snapmirror_vol_dict(na_server, vserver, volume)

    aggrs = {}

    for vol in volumes:

        aggr_name = vol.aggr['name']

        if aggr_name:

            if aggr_name in aggrs:

                aggr_attrs = aggrs[aggr_name]


                aggr_attrs = query_aggr_options(na_server, aggr_name)

                if aggr_attrs:

                    eff_disk_type = query_aggr_storage_disk(na_server,


                    aggr_attrs['disk_type'] = eff_disk_type

                aggrs[aggr_name] = aggr_attrs

            vol.aggr['raid_type'] = aggr_attrs.get('raid_type')

            vol.aggr['ha_policy'] = aggr_attrs.get('ha_policy')

            vol.aggr['disk_type'] = aggr_attrs.get('disk_type')

        if sis_vols:

            if vol.id['name'] in sis_vols:

                vol.sis['dedup'] = sis_vols[vol.id['name']]['dedup']

                vol.sis['compression'] =\



                vol.sis['dedup'] = False

                vol.sis['compression'] = False

        if (vol.space['space-guarantee-enabled'] and

                (vol.space['space-guarantee'] == 'file' or

                 vol.space['space-guarantee'] == 'volume')):

            vol.space['thin_provisioned'] = False


            vol.space['thin_provisioned'] = True

        if mirrored_vols:

            vol.mirror['mirrored'] = False

            if vol.id['name'] in mirrored_vols:

                for mirr_attrs in mirrored_vols[vol.id['name']]:

                    if (mirr_attrs['rel_type'] == 'data_protection' and

                            mirr_attrs['mirr_state'] == 'snapmirrored'):

                        vol.mirror['mirrored'] = True


    return volumes

def query_cluster_vols_for_ssc(na_server, vserver, volume=None):

    """Queries cluster volumes for ssc."""

    query = {'volume-attributes': None}

    volume_id = {'volume-id-attributes': {'owning-vserver-name': vserver}}

    if volume:

        volume_id['volume-id-attributes']['name'] = volume

    query['volume-attributes'] = volume_id

    des_attr = {'volume-attributes':





    result = na_utils.invoke_api(na_server, api_name='volume-get-iter',

                                 api_family='cm', query=query,




    vols = set()

    for res in result:

        records = res.get_child_content('num-records')

        if records > 0:

            attr_list = res.get_child_by_name('attributes-list')

            if attr_list:

                vol_attrs = attr_list.get_children()

                vols_found = create_vol_list(vol_attrs)


    return vols

def create_vol_list(vol_attrs):

    """Creates vol list with features from attr list."""

    vols = set()

    for v in vol_attrs:


            # name and vserver are mandatory

            # Absence will skip by giving KeyError.

            name = v['volume-id-attributes']['name']

            vserver = v['volume-id-attributes']['owning-vserver-name']

            vol = NetAppVolume(name, vserver)

            vol.id['type'] =\


            if vol.id['type'] == "tmp":


            vol.id['junction_path'] =\


            # state attributes mandatory.

            vol.state['vserver_root'] =\




            if vol.state['vserver_root']:


            vol.state['status'] =\


            vol.state['inconsistent'] =\




            vol.state['invalid'] =\




            vol.state['junction_active'] =\




            vol.state['cluster_volume'] =\




            if (vol.state['status'] != 'online' or

                    vol.state['inconsistent'] or vol.state['invalid']):

                # offline, invalid and inconsistent volumes are not usable


            # aggr attributes mandatory.

            vol.aggr['name'] =\


            # space attributes mandatory.

            vol.space['size_avl_bytes'] =\


            vol.space['size_total_bytes'] =\


            vol.space['space-guarantee-enabled'] =\




            vol.space['space-guarantee'] =\



            # qos attributes optional.

            if v.get_child_by_name('volume-qos-attributes'):

                vol.qos['qos_policy_group'] =\




                vol.qos['qos_policy_group'] = None


        except KeyError as e:

            LOG.debug(_('Unexpected error while creating'

                        ' ssc vol list. Message - %s') % (e.message))


    return vols

def query_aggr_options(na_server, aggr_name):

    """Queries cluster aggr for attributes.

        Currently queries for raid and ha-policy.


    add_elems = {'aggregate': aggr_name}

    attrs = {}


        result = na_utils.invoke_api(na_server,


                                     api_family='cm', query=None,




        for res in result:

            options = res.get_child_by_name('options')

            if options:

                op_list = options.get_children()

                for op in op_list:

                    if op.get_child_content('name') == 'ha_policy':

                        attrs['ha_policy'] = op.get_child_content('value')

                    if op.get_child_content('name') == 'raidtype':

                        attrs['raid_type'] = op.get_child_content('value')

    except Exception as e:

        LOG.debug(_("Exception querying aggr options. %s"), e)

    return attrs

def get_sis_vol_dict(na_server, vserver, volume=None):

    """Queries sis for volumes.

        If volume is present sis is queried for it.

        Records dedup and compression enabled.


    sis_vols = {}

    query_attr = {'vserver': vserver}

    if volume:

        vol_path = '/vol/%s' % (volume)

        query_attr['path'] = vol_path

    query = {'sis-status-info': query_attr}


        result = na_utils.invoke_api(na_server,





        for res in result:

            attr_list = res.get_child_by_name('attributes-list')

            if attr_list:

                sis_status = attr_list.get_children()

                for sis in sis_status:

                    path = sis.get_child_content('path')

                    if not path:


                    (___, __, vol) = path.rpartition('/')

                    if not vol:


                    v_sis = {}

                    v_sis['compression'] = na_utils.to_bool(


                    v_sis['dedup'] = na_utils.to_bool(


                    sis_vols[vol] = v_sis

    except Exception as e:

        LOG.debug(_("Exception querying sis information. %s"), e)

    return sis_vols

def get_snapmirror_vol_dict(na_server, vserver, volume=None):

    """Queries snapmirror volumes."""

    mirrored_vols = {}

    query_attr = {'source-vserver': vserver}

    if volume:

        query_attr['source-volume'] = volume

    query = {'snapmirror-info': query_attr}


        result = na_utils.invoke_api(na_server,


                                     api_family='cm', query=query,


        for res in result:

            attr_list = res.get_child_by_name('attributes-list')

            if attr_list:

                snap_info = attr_list.get_children()

                for snap in snap_info:

                    src_volume = snap.get_child_content('source-volume')

                    v_snap = {}

                    v_snap['dest_loc'] =\


                    v_snap['rel_type'] =\


                    v_snap['mirr_state'] =\


                    if mirrored_vols.get(src_volume):



                        mirrored_vols[src_volume] = [v_snap]

    except Exception as e:

        LOG.debug(_("Exception querying mirror information. %s"), e)

    return mirrored_vols

def query_aggr_storage_disk(na_server, aggr):

    """Queries for storage disks associated to an aggregate."""

    query = {'storage-disk-info': {'disk-raid-info':


                                       {'aggregate-name': aggr}}}}

    des_attr = {'storage-disk-info':

                {'disk-raid-info': ['effective-disk-type']}}


        result = na_utils.invoke_api(na_server,


                                     api_family='cm', query=query,




        for res in result:

            attr_list = res.get_child_by_name('attributes-list')

            if attr_list:

                storage_disks = attr_list.get_children()

                for disk in storage_disks:

                    raid_info = disk.get_child_by_name('disk-raid-info')

                    if raid_info:

                        eff_disk_type =\


                        if eff_disk_type:

                            return eff_disk_type



    except Exception as e:

        LOG.debug(_("Exception querying storage disk. %s"), e)

    return 'unknown'

def get_cluster_ssc(na_server, vserver):

    """Provides cluster volumes with ssc."""

    netapp_volumes = get_cluster_vols_with_ssc(na_server, vserver)

    mirror_vols = set()

    dedup_vols = set()

    compress_vols = set()

    thin_prov_vols = set()

    ssc_map = {'mirrored': mirror_vols, 'dedup': dedup_vols,

               'compression': compress_vols,

               'thin': thin_prov_vols, 'all': netapp_volumes}

    for vol in netapp_volumes:

        if vol.sis.get('dedup'):


        if vol.sis.get('compression'):


        if vol.mirror.get('mirrored'):


        if vol.space.get('thin_provisioned'):


    return ssc_map

def refresh_cluster_stale_ssc(*args, **kwargs):

    """Refreshes stale ssc volumes with latest."""

    backend = args[0]

    na_server = args[1]

    vserver = args[2]

    identity = str(id(backend))

    lock_pr = '%s_%s' % ('refresh_ssc', identity)


        job_set = na_utils.set_safe_attr(

            backend, 'refresh_stale_running', True)

        if not job_set:



        def refresh_stale_ssc():

                stale_vols = backend._update_stale_vols(reset=True)

                LOG.info(_('Running stale ssc refresh job for %(server)s'

                           ' and vserver %(vs)s')

                         % {'server': na_server, 'vs': vserver})

                # refreshing single volumes can create inconsistency

                # hence doing manipulations on copy

                ssc_vols_copy = copy.deepcopy(backend.ssc_vols)

                refresh_vols = set()

                expired_vols = set()

                for vol in stale_vols:

                    name = vol.id['name']

                    res = get_cluster_vols_with_ssc(na_server, vserver, name)

                    if res:




                for vol in refresh_vols:

                    for k in ssc_vols_copy:

                        vol_set = ssc_vols_copy[k]


                        if k == "mirrored" and vol.mirror.get('mirrored'):


                        if k == "dedup" and vol.sis.get('dedup'):


                        if k == "compression" and vol.sis.get('compression'):


                        if k == "thin" and vol.space.get('thin_provisioned'):


                        if k == "all":


                for vol in expired_vols:

                    for k in ssc_vols_copy:

                        vol_set = ssc_vols_copy[k]



                LOG.info(_('Successfully completed stale refresh job for'

                           ' %(server)s and vserver %(vs)s')

                         % {'server': na_server, 'vs': vserver})



        na_utils.set_safe_attr(backend, 'refresh_stale_running', False)

def get_cluster_latest_ssc(*args, **kwargs):

    """Updates volumes including ssc."""

    backend = args[0]

    na_server = args[1]

    vserver = args[2]

    identity = str(id(backend))

    lock_pr = '%s_%s' % ('refresh_ssc', identity)

    # As this depends on stale job running state

    # set flag as soon as job starts to avoid

    # job accumulation.


        job_set = na_utils.set_safe_attr(backend, 'ssc_job_running', True)

        if not job_set:



        def get_latest_ssc():

            LOG.info(_('Running cluster latest ssc job for %(server)s'

                       ' and vserver %(vs)s')

                     % {'server': na_server, 'vs': vserver})

            ssc_vols = get_cluster_ssc(na_server, vserver)


            backend.ssc_run_time = timeutils.utcnow()

            LOG.info(_('Successfully completed ssc job for %(server)s'

                       ' and vserver %(vs)s')

                     % {'server': na_server, 'vs': vserver})



        na_utils.set_safe_attr(backend, 'ssc_job_running', False)

def refresh_cluster_ssc(backend, na_server, vserver, synchronous=False):

    """Refresh cluster ssc for backend."""

    if not isinstance(backend, driver.VolumeDriver):

        raise exception.InvalidInput(reason=_("Backend not a VolumeDriver."))

    if not isinstance(na_server, api.NaServer):

        raise exception.InvalidInput(reason=_("Backend server not NaServer."))

    delta_secs = getattr(backend, 'ssc_run_delta_secs', 1800)

    if getattr(backend, 'ssc_job_running', None):

            LOG.warn(_('ssc job in progress. Returning... '))


    elif (getattr(backend, 'ssc_run_time', None) is None or

          (backend.ssc_run_time and

           timeutils.is_newer_than(backend.ssc_run_time, delta_secs))):

        if synchronous:

            get_cluster_latest_ssc(backend, na_server, vserver)


            t = Timer(0, get_cluster_latest_ssc,

                      args=[backend, na_server, vserver])


    elif getattr(backend, 'refresh_stale_running', None):

            LOG.warn(_('refresh stale ssc job in progress. Returning... '))



        if backend.stale_vols:

            if synchronous:

                refresh_cluster_stale_ssc(backend, na_server, vserver)


                t = Timer(0, refresh_cluster_stale_ssc,

                          args=[backend, na_server, vserver])


def get_volumes_for_specs(ssc_vols, specs):

    """Shortlists volumes for extra specs provided."""

    if specs is None or not isinstance(specs, dict):

        return ssc_vols['all']

    result = copy.deepcopy(ssc_vols['all'])

    raid_type = specs.get('netapp:raid_type')

    disk_type = specs.get('netapp:disk_type')

    bool_specs_list = ['netapp_mirrored', 'netapp_unmirrored',

                       'netapp_dedup', 'netapp_nodedup',

                       'netapp_compression', 'netapp_nocompression',

                       'netapp_thin_provisioned', 'netapp_thick_provisioned']

    b_specs = {}

    for spec in bool_specs_list:

        b_specs[spec] = na_utils.to_bool(specs.get(spec))\

            if specs.get(spec) else None

def refresh_cluster_ssc(backend, na_server, vserver, synchronous=False):

    """Refresh cluster ssc for backend."""

    if not isinstance(backend, driver.VolumeDriver):

        raise exception.InvalidInput(reason=_("Backend not a VolumeDriver."))

    if not isinstance(na_server, api.NaServer):

        raise exception.InvalidInput(reason=_("Backend server not NaServer."))

    delta_secs = getattr(backend, 'ssc_run_delta_secs', 1800)

    if getattr(backend, 'ssc_job_running', None):

            LOG.warn(_('ssc job in progress. Returning... '))


    elif (getattr(backend, 'ssc_run_time', None) is None or

          (backend.ssc_run_time and

           timeutils.is_newer_than(backend.ssc_run_time, delta_secs))):

        if synchronous:

            get_cluster_latest_ssc(backend, na_server, vserver)


            t = Timer(0, get_cluster_latest_ssc,

                      args=[backend, na_server, vserver])


    elif getattr(backend, 'refresh_stale_running', None):

            LOG.warn(_('refresh stale ssc job in progress. Returning... '))



        if backend.stale_vols:

            if synchronous:

                refresh_cluster_stale_ssc(backend, na_server, vserver)


                t = Timer(0, refresh_cluster_stale_ssc,

                          args=[backend, na_server, vserver])


def get_volumes_for_specs(ssc_vols, specs):

    """Shortlists volumes for extra specs provided."""

    if specs is None or not isinstance(specs, dict):

        return ssc_vols['all']

    result = copy.deepcopy(ssc_vols['all'])

    raid_type = specs.get('netapp:raid_type')

    disk_type = specs.get('netapp:disk_type')

    bool_specs_list = ['netapp_mirrored', 'netapp_unmirrored',

                       'netapp_dedup', 'netapp_nodedup',

                       'netapp_compression', 'netapp_nocompression',

                       'netapp_thin_provisioned', 'netapp_thick_provisioned']

    b_specs = {}

    for spec in bool_specs_list:

        b_specs[spec] = na_utils.to_bool(specs.get(spec))\

            if specs.get(spec) else None

    def _spec_ineffect(b_specs, spec, opp_spec):

        """If the spec with opposite spec is ineffective."""

        if ((b_specs[spec] is None and b_specs[opp_spec] is None)

                or (b_specs[spec] == b_specs[opp_spec])):

            return True


            return False

    if _spec_ineffect(b_specs, 'netapp_mirrored', 'netapp_unmirrored'):



        if b_specs['netapp_mirrored'] or b_specs['netapp_unmirrored'] is False:

            result = result & ssc_vols['mirrored']


            result = result - ssc_vols['mirrored']

    if _spec_ineffect(b_specs, 'netapp_dedup', 'netapp_nodedup'):



        if b_specs['netapp_dedup'] or b_specs['netapp_nodedup'] is False:

            result = result & ssc_vols['dedup']


            result = result - ssc_vols['dedup']

    if _spec_ineffect(b_specs, 'netapp_compression', 'netapp_nocompression'):



        if (b_specs['netapp_compression'] or

                b_specs['netapp_nocompression'] is False):

            result = result & ssc_vols['compression']


            result = result - ssc_vols['compression']

    if _spec_ineffect(b_specs, 'netapp_thin_provisioned',




        if (b_specs['netapp_thin_provisioned'] or

                b_specs['netapp_thick_provisioned'] is False):

            result = result & ssc_vols['thin']


            result = result - ssc_vols['thin']

    if raid_type or disk_type:

        tmp = copy.deepcopy(result)

        for vol in tmp:

            if raid_type:

                vol_raid = vol.aggr['raid_type']

                vol_raid = vol_raid.lower() if vol_raid else None

                if raid_type.lower() != vol_raid:


            if disk_type:

                vol_dtype = vol.aggr['disk_type']

                vol_dtype = vol_dtype.lower() if vol_dtype else None

                if disk_type.lower() != vol_dtype:


    return result

def check_ssc_api_permissions(na_server):

    """Checks backend ssc api permissions for the user."""

    api_map = {'storage-disk-get-iter': ['netapp:disk_type'],

               'snapmirror-get-iter': ['netapp_mirrored',


               'sis-get-iter': ['netapp_dedup', 'netapp_nodedup',



               'aggr-options-list-info': ['netapp:raid_type'],

               'volume-get-iter': []}

    failed_apis = na_utils.check_apis_on_cluster(na_server, api_map.keys())

    if failed_apis:

        if 'volume-get-iter' in failed_apis:

            msg = _("Fatal error: User not permitted"

                    " to query NetApp volumes.")

            raise exception.VolumeBackendAPIException(data=msg)


            unsupp_ssc_features = []

            for fail in failed_apis:


            LOG.warn(_("The user does not have access or sufficient"

                       " privileges to use all netapp apis. The following"

                       " extra_specs will fail or be ignored: %s"),
