**** CubicPower OpenStack Study ****
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 Radware LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Avishay Balderman, Radware
import base64
import copy
import httplib
import Queue
import socket
import threading
import time
import eventlet
from oslo.config import cfg
from neutron.common import log as call_log
from neutron import context
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.extensions import loadbalancer
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
eventlet.monkey_patch(thread=True)
LOG = logging.getLogger(__name__)
RESP_STATUS = 0
RESP_REASON = 1
RESP_STR = 2
RESP_DATA = 3
TEMPLATE_HEADER = {'Content-Type':
'application/vnd.com.radware.vdirect.'
'template-parameters+json'}
PROVISION_HEADER = {'Content-Type':
'application/vnd.com.radware.'
'vdirect.status+json'}
CREATE_SERVICE_HEADER = {'Content-Type':
'application/vnd.com.radware.'
'vdirect.adc-service-specification+json'}
driver_opts = [
cfg.StrOpt('vdirect_address',
help=_('vdirect server IP address')),
cfg.StrOpt('vdirect_user',
default='vDirect',
help=_('vdirect user name')),
cfg.StrOpt('vdirect_password',
default='radware',
help=_('vdirect user password')),
cfg.StrOpt('service_adc_type',
default="VA",
help=_('Service ADC type')),
cfg.StrOpt('service_adc_version',
default="",
help=_('Service ADC version')),
cfg.BoolOpt('service_ha_pair',
default=False,
help=_('service HA pair')),
cfg.IntOpt('service_throughput',
default=1000,
help=_('service throughtput')),
cfg.IntOpt('service_ssl_throughput',
default=100,
help=_('service ssl throughtput')),
cfg.IntOpt('service_compression_throughput',
default=100,
help=_('service compression throughtput')),
cfg.IntOpt('service_cache',
default=20,
help=_('service cache')),
cfg.StrOpt('l2_l3_workflow_name',
default='openstack_l2_l3',
help=_('l2_l3 workflow name')),
cfg.StrOpt('l4_workflow_name',
default='openstack_l4',
help=_('l4 workflow name')),
cfg.DictOpt('l2_l3_ctor_params',
default={"service": "_REPLACE_",
"ha_network_name": "HA-Network",
"ha_ip_pool_name": "default",
"allocate_ha_vrrp": True,
"allocate_ha_ips": True},
help=_('l2_l3 workflow constructor params')),
cfg.DictOpt('l2_l3_setup_params',
default={"data_port": 1,
"data_ip_address": "192.168.200.99",
"data_ip_mask": "255.255.255.0",
"gateway": "192.168.200.1",
"ha_port": 2},
help=_('l2_l3 workflow setup params')),
cfg.ListOpt('actions_to_skip',
default=['setup_l2_l3'],
help=_('List of actions that we dont want to push to '
'the completion queue')),
cfg.StrOpt('l4_action_name',
default='BaseCreate',
help=_('l4 workflow action name')),
cfg.ListOpt('service_resource_pool_ids',
default=[],
help=_('Resource pool ids')),
cfg.IntOpt('service_isl_vlan',
default=-1,
help=_('A required VLAN for the interswitch link to use')),
cfg.BoolOpt('service_session_mirroring_enabled',
default=False,
help=_('Support an Alteon interswitch '
'link for stateful session failover'))
]
cfg.CONF.register_opts(driver_opts, "radware")
**** CubicPower OpenStack Study ****
class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
"""Radware lbaas driver."""
**** CubicPower OpenStack Study ****
def __init__(self, plugin):
rad = cfg.CONF.radware
self.plugin = plugin
self.service = {
"haPair": rad.service_ha_pair,
"sessionMirroringEnabled": rad.service_session_mirroring_enabled,
"primary": {
"capacity": {
"throughput": rad.service_throughput,
"sslThroughput": rad.service_ssl_throughput,
"compressionThroughput":
rad.service_compression_throughput,
"cache": rad.service_cache
},
"network": {
"type": "portgroup",
"portgroups": ['DATA_NETWORK']
},
"adcType": rad.service_adc_type,
"acceptableAdc": "Exact"
}
}
if rad.service_resource_pool_ids:
ids = rad.service_resource_pool_ids
self.service['resourcePoolIds'] = [
{'name': id} for id in ids
]
if rad.service_isl_vlan:
self.service['islVlan'] = rad.service_isl_vlan
self.l2_l3_wf_name = rad.l2_l3_workflow_name
self.l4_wf_name = rad.l4_workflow_name
self.l2_l3_ctor_params = rad.l2_l3_ctor_params
self.l2_l3_setup_params = rad.l2_l3_setup_params
self.l4_action_name = rad.l4_action_name
self.actions_to_skip = rad.actions_to_skip
vdirect_address = cfg.CONF.radware.vdirect_address
self.rest_client = vDirectRESTClient(server=vdirect_address,
user=rad.vdirect_user,
password=rad.vdirect_password)
self.queue = Queue.Queue()
self.completion_handler = OperationCompletionHandler(self.queue,
self.rest_client,
plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
self.completion_handler_started = False
**** CubicPower OpenStack Study ****
def create_vip(self, context, vip):
LOG.debug(_('create_vip. vip: %s'), str(vip))
extended_vip = self.plugin.populate_vip_graph(context, vip)
LOG.debug(_('create_vip. extended_vip: %s'), str(extended_vip))
network_id = self._get_vip_network_id(context, extended_vip)
LOG.debug(_('create_vip. network_id: %s '), str(network_id))
service_name = self._get_service(extended_vip['pool_id'], network_id)
LOG.debug(_('create_vip. service_name: %s '), service_name)
self._create_workflow(
vip['pool_id'], self.l4_wf_name,
{"service": service_name})
self._update_workflow(
vip['pool_id'],
self.l4_action_name, extended_vip, context)
**** CubicPower OpenStack Study ****
def update_vip(self, context, old_vip, vip):
extended_vip = self.plugin.populate_vip_graph(context, vip)
self._update_workflow(
vip['pool_id'], self.l4_action_name,
extended_vip, context, False, lb_db.Vip, vip['id'])
**** CubicPower OpenStack Study ****
def delete_vip(self, context, vip):
"""Delete a Vip
First delete it from the device. If deletion ended OK
- remove data from DB as well.
If the deletion failed - mark vip with error status in DB
"""
extended_vip = self.plugin.populate_vip_graph(context, vip)
params = _translate_vip_object_graph(extended_vip,
self.plugin, context)
ids = params.pop('__ids__')
try:
# removing the WF will cause deletion of the configuration from the
# device
self._remove_workflow(ids, context)
except r_exc.RESTRequestFailure:
pool_id = extended_vip['pool_id']
LOG.exception(_('Failed to remove workflow %s. '
'Going to set vip to ERROR status'),
pool_id)
self.plugin.update_status(context, lb_db.Vip, ids['vip'],
constants.ERROR)
**** CubicPower OpenStack Study ****
def create_pool(self, context, pool):
# nothing to do
pass
**** CubicPower OpenStack Study ****
def update_pool(self, context, old_pool, pool):
self._handle_pool(context, pool)
**** CubicPower OpenStack Study ****
def delete_pool(self, context, pool,):
self._handle_pool(context, pool, delete=True)
**** CubicPower OpenStack Study ****
def _handle_pool(self, context, pool, delete=False):
vip_id = self.plugin.get_pool(context, pool['id']).get('vip_id', None)
if vip_id:
if delete:
raise loadbalancer.PoolInUse(pool_id=pool['id'])
else:
vip = self.plugin.get_vip(context, vip_id)
extended_vip = self.plugin.populate_vip_graph(context, vip)
self._update_workflow(
pool['id'], self.l4_action_name,
extended_vip, context, delete, lb_db.Pool, pool['id'])
else:
if delete:
self.plugin._delete_db_pool(context, pool['id'])
else:
# we keep the pool in PENDING_UPDATE
# no point to modify it since it is not connected to vip yet
pass
**** CubicPower OpenStack Study ****
def create_member(self, context, member):
self._handle_member(context, member)
**** CubicPower OpenStack Study ****
def update_member(self, context, old_member, member):
self._handle_member(context, member)
**** CubicPower OpenStack Study ****
def delete_member(self, context, member):
self._handle_member(context, member, delete=True)
**** CubicPower OpenStack Study ****
def _handle_member(self, context, member, delete=False):
"""Navigate the model. If a Vip is found - activate a bulk WF action.
"""
vip_id = self.plugin.get_pool(
context, member['pool_id']).get('vip_id')
if vip_id:
vip = self.plugin.get_vip(context, vip_id)
extended_vip = self.plugin.populate_vip_graph(context, vip)
self._update_workflow(
member['pool_id'], self.l4_action_name,
extended_vip, context,
delete, lb_db.Member, member['id'])
# We have to delete this member but it is not connected to a vip yet
elif delete:
self.plugin._delete_db_member(context, member['id'])
**** CubicPower OpenStack Study ****
def create_health_monitor(self, context, health_monitor):
# Anything to do here? the hm is not connected to the graph yet
pass
**** CubicPower OpenStack Study ****
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor,
pool_id):
self._handle_pool_health_monitor(context, health_monitor, pool_id)
**** CubicPower OpenStack Study ****
def create_pool_health_monitor(self, context,
health_monitor, pool_id):
self._handle_pool_health_monitor(context, health_monitor, pool_id)
**** CubicPower OpenStack Study ****
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self._handle_pool_health_monitor(context, health_monitor, pool_id,
True)
**** CubicPower OpenStack Study ****
def _handle_pool_health_monitor(self, context, health_monitor,
pool_id, delete=False):
"""Push a graph to vDirect
Navigate the model. Check if a pool is associated to the vip
and push the graph to vDirect
"""
vip_id = self.plugin.get_pool(context, pool_id).get('vip_id', None)
debug_params = {"hm_id": health_monitor['id'], "pool_id": pool_id,
"delete": delete, "vip_id": vip_id}
LOG.debug(_('_handle_pool_health_monitor. health_monitor = %(hm_id)s '
'pool_id = %(pool_id)s delete = %(delete)s '
'vip_id = %(vip_id)s'),
debug_params)
if vip_id:
vip = self.plugin.get_vip(context, vip_id)
extended_vip = self.plugin.populate_vip_graph(context, vip)
self._update_workflow(pool_id, self.l4_action_name,
extended_vip, context,
delete, lb_db.PoolMonitorAssociation,
health_monitor['id'])
elif delete:
self.plugin._delete_db_pool_health_monitor(context,
health_monitor['id'],
pool_id)
**** CubicPower OpenStack Study ****
def stats(self, context, pool_id):
# TODO(avishayb) implement
return {"bytes_in": 0,
"bytes_out": 0,
"active_connections": 0,
"total_connections": 0}
**** CubicPower OpenStack Study ****
def _get_vip_network_id(self, context, extended_vip):
subnet = self.plugin._core_plugin.get_subnet(
context, extended_vip['subnet_id'])
return subnet['network_id']
**** CubicPower OpenStack Study ****
def _start_completion_handling_thread(self):
if not self.completion_handler_started:
LOG.info(_('Starting operation completion handling thread'))
self.completion_handler.start()
self.completion_handler_started = True
@call_log.log
**** CubicPower OpenStack Study ****
def _update_workflow(self, wf_name, action,
wf_params, context,
delete=False,
lbaas_entity=None, entity_id=None):
"""Update the WF state. Push the result to a queue for processing."""
if not self.workflow_templates_exists:
self._verify_workflow_templates()
if action not in self.actions_to_skip:
params = _translate_vip_object_graph(wf_params,
self.plugin,
context)
else:
params = wf_params
resource = '/api/workflow/%s/action/%s' % (wf_name, action)
response = _rest_wrapper(self.rest_client.call('POST', resource,
{'parameters': params},
TEMPLATE_HEADER))
LOG.debug(_('_update_workflow response: %s '), response)
if action not in self.actions_to_skip:
ids = params.pop('__ids__', None)
oper = OperationAttributes(response['uri'],
ids,
lbaas_entity,
entity_id,
delete=delete)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
**** CubicPower OpenStack Study ****
def _remove_workflow(self, ids, context):
wf_name = ids['pool']
LOG.debug(_('Remove the workflow %s') % wf_name)
resource = '/api/workflow/%s' % (wf_name)
response = _rest_wrapper(self.rest_client.call('DELETE', resource,
None, None),
[204, 202, 404])
msg = response.get('message', None)
if msg == "Not Found":
self.plugin._delete_db_vip(context, ids['vip'])
else:
oper = OperationAttributes(response['uri'],
ids,
lb_db.Vip,
ids['vip'],
delete=True)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
**** CubicPower OpenStack Study ****
def _remove_service(self, service_name):
resource = '/api/service/%s' % (service_name)
_rest_wrapper(self.rest_client.call('DELETE',
resource, None, None),
[202])
**** CubicPower OpenStack Study ****
def _get_service(self, pool_id, network_id):
"""Get a service name.
if you can't find one,
create a service and create l2_l2 WF.
"""
if not self.workflow_templates_exists:
self._verify_workflow_templates()
incoming_service_name = 'srv_' + network_id
service_name = self._get_available_service(incoming_service_name)
if not service_name:
LOG.debug(
'Could not find a service named ' + incoming_service_name)
service_name = self._create_service(pool_id, network_id)
self.l2_l3_ctor_params["service"] = incoming_service_name
wf_name = 'l2_l3_' + network_id
self._create_workflow(
wf_name, self.l2_l3_wf_name, self.l2_l3_ctor_params)
self._update_workflow(
wf_name, "setup_l2_l3", self.l2_l3_setup_params, None)
else:
LOG.debug('A service named ' + service_name + ' was found.')
return service_name
**** CubicPower OpenStack Study ****
def _create_service(self, pool_id, network_id):
"""create the service and provision it (async)."""
# 1) create the service
service_name = 'srv_' + network_id
resource = '/api/service?name=%s' % service_name
service = copy.deepcopy(self.service)
service['primary']['network']['portgroups'] = [network_id]
response = _rest_wrapper(self.rest_client.call('POST', resource,
service,
CREATE_SERVICE_HEADER), [201])
# 2) provision the service
provision_uri = response['links']['actions']['provision']
_rest_wrapper(self.rest_client.call('POST', provision_uri,
None, PROVISION_HEADER))
return service_name
**** CubicPower OpenStack Study ****
def _get_available_service(self, service_name):
"""Check if service exsists and return its name if it does."""
resource = '/api/service/' + service_name
try:
_rest_wrapper(self.rest_client.call('GET',
resource,
None, None), [200])
except Exception:
return
return service_name
**** CubicPower OpenStack Study ****
def _workflow_exists(self, pool_id):
"""Check if a WF having the name of the pool_id exists."""
resource = '/api/workflow/' + pool_id
try:
_rest_wrapper(self.rest_client.call('GET',
resource,
None,
None), [200])
except Exception:
return False
return True
**** CubicPower OpenStack Study ****
def _create_workflow(self, wf_name, wf_template_name,
create_workflow_params=None):
"""Create a WF if it doesn't exists yet."""
if not self.workflow_templates_exists:
self._verify_workflow_templates()
if not self._workflow_exists(wf_name):
if not create_workflow_params:
create_workflow_params = {}
resource = '/api/workflowTemplate/%s?name=%s' % (
wf_template_name, wf_name)
params = {'parameters': create_workflow_params}
response = _rest_wrapper(self.rest_client.call('POST',
resource,
params,
TEMPLATE_HEADER))
LOG.debug(_('create_workflow response: %s'), str(response))
**** CubicPower OpenStack Study ****
def _verify_workflow_templates(self):
"""Verify the existence of workflows on vDirect server."""
workflows = {self.l2_l3_wf_name:
False, self.l4_wf_name: False}
resource = '/api/workflowTemplate'
response = _rest_wrapper(self.rest_client.call('GET',
resource,
None,
None), [200])
for wf in workflows.keys():
for wf_template in response:
if wf == wf_template['name']:
workflows[wf] = True
break
for wf, found in workflows.items():
if not found:
raise r_exc.WorkflowMissing(workflow=wf)
self.workflow_templates_exists = True
**** CubicPower OpenStack Study ****
class vDirectRESTClient:
"""REST server proxy to Radware vDirect."""
**** CubicPower OpenStack Study ****
def __init__(self,
server='localhost',
user=None,
password=None,
port=2189,
ssl=True,
timeout=5000,
base_uri=''):
self.server = server
self.port = port
self.ssl = ssl
self.base_uri = base_uri
self.timeout = timeout
if user and password:
self.auth = base64.encodestring('%s:%s' % (user, password))
self.auth = self.auth.replace('\n', '')
else:
raise r_exc.AuthenticationMissing()
debug_params = {'server': self.server,
'port': self.port,
'ssl': self.ssl}
LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
'port=%(port)d, '
'ssl=%(ssl)r'), debug_params)
@call_log.log
**** CubicPower OpenStack Study ****
def call(self, action, resource, data, headers, binary=False):
if resource.startswith('http'):
uri = resource
else:
uri = self.base_uri + resource
if binary:
body = data
else:
body = json.dumps(data)
debug_data = 'binary' if binary else body
debug_data = debug_data if debug_data else 'EMPTY'
if not headers:
headers = {'Authorization': 'Basic %s' % self.auth}
else:
headers['Authorization'] = 'Basic %s' % self.auth
conn = None
if self.ssl:
conn = httplib.HTTPSConnection(
self.server, self.port, timeout=self.timeout)
if conn is None:
LOG.error(_('vdirectRESTClient: Could not establish HTTPS '
'connection'))
return 0, None, None, None
else:
conn = httplib.HTTPConnection(
self.server, self.port, timeout=self.timeout)
if conn is None:
LOG.error(_('vdirectRESTClient: Could not establish HTTP '
'connection'))
return 0, None, None, None
try:
conn.request(action, uri, body, headers)
response = conn.getresponse()
respstr = response.read()
respdata = respstr
try:
respdata = json.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
ret = (response.status, response.reason, respstr, respdata)
except (socket.timeout, socket.error) as e:
log_dict = {'action': action, 'e': e}
LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
log_dict)
ret = 0, None, None, None
conn.close()
return ret
**** CubicPower OpenStack Study ****
class OperationAttributes:
"""Holds operation attributes."""
**** CubicPower OpenStack Study ****
def __init__(self,
operation_url,
object_graph,
lbaas_entity=None,
entity_id=None,
delete=False):
self.operation_url = operation_url
self.object_graph = object_graph
self.delete = delete
self.lbaas_entity = lbaas_entity
self.entity_id = entity_id
self.creation_time = time.time()
**** CubicPower OpenStack Study ****
def __repr__(self):
items = ("%s = %r" % (k, v) for k, v in self.__dict__.items())
return "<%s: {%s}>" % (self.__class__.__name__, ', '.join(items))
**** CubicPower OpenStack Study ****
class OperationCompletionHandler(threading.Thread):
"""Update DB with operation status or delete the entity from DB."""
**** CubicPower OpenStack Study ****
def __init__(self, queue, rest_client, plugin):
threading.Thread.__init__(self)
self.queue = queue
self.rest_client = rest_client
self.plugin = plugin
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0
**** CubicPower OpenStack Study ****
def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHandler, self).join(timeout)
**** CubicPower OpenStack Study ****
def handle_operation_completion(self, oper):
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
reason = result[RESP_REASON],
description = result[RESP_STR]
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = None
if not success:
# failure - log it and set the return ERROR as DB state
if reason or description:
msg = 'Reason:%s. Description:%s' % (reason, description)
else:
msg = "unknown"
error_params = {"operation": oper, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
db_status = constants.ERROR
else:
if oper.delete:
_remove_object_from_db(self.plugin, oper)
else:
db_status = constants.ACTIVE
if db_status:
_update_vip_graph_status(self.plugin, oper, db_status)
return completed
**** CubicPower OpenStack Study ****
def run(self):
oper = None
while not self.stoprequest.isSet():
try:
oper = self.queue.get(timeout=1)
# Get the current queue size (N) and set the counter with it.
# Handle N operations with no intermission.
# Once N operations handles, get the size again and repeat.
if self.opers_to_handle_before_rest <= 0:
self.opers_to_handle_before_rest = self.queue.qsize() + 1
LOG.debug('Operation consumed from the queue: ' +
str(oper))
# check the status - if oper is done: update the db ,
# else push the oper again to the queue
if not self.handle_operation_completion(oper):
LOG.debug(_('Operation %s is not completed yet..') % oper)
# Not completed - push to the queue again
self.queue.put_nowait(oper)
self.queue.task_done()
self.opers_to_handle_before_rest -= 1
# Take one second rest before start handling
# new operations or operations handled before
if self.opers_to_handle_before_rest <= 0:
time.sleep(1)
except Queue.Empty:
continue
except Exception:
m = _("Exception was thrown inside OperationCompletionHandler")
LOG.exception(m)
def _rest_wrapper(response, success_codes=[202]):
"""Wrap a REST call and make sure a valid status is returned."""
if response[RESP_STATUS] not in success_codes:
raise r_exc.RESTRequestFailure(
status=response[RESP_STATUS],
reason=response[RESP_REASON],
description=response[RESP_STR],
success_codes=success_codes
)
else:
return response[RESP_DATA]
def _update_vip_graph_status(plugin, oper, status):
"""Update the status
Of all the Vip object graph
or a specific entity in the graph.
"""
ctx = context.get_admin_context(load_admin_roles=False)
LOG.debug(_('_update: %s '), oper)
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin.update_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'],
status)
elif oper.entity_id:
plugin.update_status(ctx,
oper.lbaas_entity,
oper.entity_id,
status)
else:
_update_vip_graph_status_cascade(plugin,
oper.object_graph,
ctx, status)
def _update_vip_graph_status_cascade(plugin, ids, ctx, status):
plugin.update_status(ctx,
lb_db.Vip,
ids['vip'],
status)
plugin.update_status(ctx,
lb_db.Pool,
ids['pool'],
status)
for member_id in ids['members']:
plugin.update_status(ctx,
lb_db.Member,
member_id,
status)
for hm_id in ids['health_monitors']:
plugin.update_pool_health_monitor(ctx,
hm_id,
ids['pool'],
status)
def _remove_object_from_db(plugin, oper):
"""Remove a specific entity from db."""
LOG.debug(_('_remove_object_from_db %s'), str(oper))
ctx = context.get_admin_context(load_admin_roles=False)
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin._delete_db_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'])
elif oper.lbaas_entity == lb_db.Member:
plugin._delete_db_member(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Vip:
plugin._delete_db_vip(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Pool:
plugin._delete_db_pool(ctx, oper.entity_id)
else:
raise r_exc.UnsupportedEntityOperation(
operation='Remove from DB', entity=oper.lbaas_entity
)
TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP',
'session_persistence_cookie_name': 'none',
'url_path': '/',
'http_method': 'GET',
'expected_codes': '200'
}
VIP_PROPERTIES = ['address', 'protocol_port', 'protocol', 'connection_limit',
'admin_state_up', 'session_persistence_type',
'session_persistence_cookie_name']
POOL_PROPERTIES = ['protocol', 'lb_method', 'admin_state_up']
MEMBER_PROPERTIES = ['address', 'protocol_port', 'weight', 'admin_state_up']
HEALTH_MONITOR_PROPERTIES = ['type', 'delay', 'timeout', 'max_retries',
'admin_state_up', 'url_path', 'http_method',
'expected_codes', 'id']
def _translate_vip_object_graph(extended_vip, plugin, context):
"""Translate the extended vip
translate to a structure that can be
understood by the workflow.
"""
**** CubicPower OpenStack Study ****
def _rest_wrapper(response, success_codes=[202]):
"""Wrap a REST call and make sure a valid status is returned."""
if response[RESP_STATUS] not in success_codes:
raise r_exc.RESTRequestFailure(
status=response[RESP_STATUS],
reason=response[RESP_REASON],
description=response[RESP_STR],
success_codes=success_codes
)
else:
return response[RESP_DATA]
**** CubicPower OpenStack Study ****
def _update_vip_graph_status(plugin, oper, status):
"""Update the status
Of all the Vip object graph
or a specific entity in the graph.
"""
ctx = context.get_admin_context(load_admin_roles=False)
LOG.debug(_('_update: %s '), oper)
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin.update_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'],
status)
elif oper.entity_id:
plugin.update_status(ctx,
oper.lbaas_entity,
oper.entity_id,
status)
else:
_update_vip_graph_status_cascade(plugin,
oper.object_graph,
ctx, status)
**** CubicPower OpenStack Study ****
def _update_vip_graph_status_cascade(plugin, ids, ctx, status):
plugin.update_status(ctx,
lb_db.Vip,
ids['vip'],
status)
plugin.update_status(ctx,
lb_db.Pool,
ids['pool'],
status)
for member_id in ids['members']:
plugin.update_status(ctx,
lb_db.Member,
member_id,
status)
for hm_id in ids['health_monitors']:
plugin.update_pool_health_monitor(ctx,
hm_id,
ids['pool'],
status)
**** CubicPower OpenStack Study ****
def _remove_object_from_db(plugin, oper):
"""Remove a specific entity from db."""
LOG.debug(_('_remove_object_from_db %s'), str(oper))
ctx = context.get_admin_context(load_admin_roles=False)
if oper.lbaas_entity == lb_db.PoolMonitorAssociation:
plugin._delete_db_pool_health_monitor(ctx,
oper.entity_id,
oper.object_graph['pool'])
elif oper.lbaas_entity == lb_db.Member:
plugin._delete_db_member(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Vip:
plugin._delete_db_vip(ctx, oper.entity_id)
elif oper.lbaas_entity == lb_db.Pool:
plugin._delete_db_pool(ctx, oper.entity_id)
else:
raise r_exc.UnsupportedEntityOperation(
operation='Remove from DB', entity=oper.lbaas_entity
)
TRANSLATION_DEFAULTS = {'session_persistence_type': 'SOURCE_IP',
'session_persistence_cookie_name': 'none',
'url_path': '/',
'http_method': 'GET',
'expected_codes': '200'
}
VIP_PROPERTIES = ['address', 'protocol_port', 'protocol', 'connection_limit',
'admin_state_up', 'session_persistence_type',
'session_persistence_cookie_name']
POOL_PROPERTIES = ['protocol', 'lb_method', 'admin_state_up']
MEMBER_PROPERTIES = ['address', 'protocol_port', 'weight', 'admin_state_up']
HEALTH_MONITOR_PROPERTIES = ['type', 'delay', 'timeout', 'max_retries',
'admin_state_up', 'url_path', 'http_method',
'expected_codes', 'id']
**** CubicPower OpenStack Study ****
def _translate_vip_object_graph(extended_vip, plugin, context):
"""Translate the extended vip
translate to a structure that can be
understood by the workflow.
"""
**** CubicPower OpenStack Study ****
def _create_key(prefix, property_name):
return prefix + '_' + property_name + '_array'
**** CubicPower OpenStack Study ****
def _trans_prop_name(prop_name):
if prop_name == 'id':
return 'uuid'
else:
return prop_name
**** CubicPower OpenStack Study ****
def get_ids(extended_vip):
ids = {}
ids['vip'] = extended_vip['id']
ids['pool'] = extended_vip['pool']['id']
ids['members'] = [m['id'] for m in extended_vip['members']]
ids['health_monitors'] = [
hm['id'] for hm in extended_vip['health_monitors']
]
return ids
trans_vip = {}
LOG.debug('Vip graph to be translated: ' + str(extended_vip))
for vip_property in VIP_PROPERTIES:
trans_vip['vip_' + vip_property] = extended_vip.get(
vip_property, TRANSLATION_DEFAULTS.get(vip_property))
for pool_property in POOL_PROPERTIES:
trans_vip['pool_' + pool_property] = extended_vip[
'pool'][pool_property]
for member_property in MEMBER_PROPERTIES:
trans_vip[_create_key('member', member_property)] = []
for member in extended_vip['members']:
if member['status'] != constants.PENDING_DELETE:
for member_property in MEMBER_PROPERTIES:
trans_vip[_create_key('member', member_property)].append(
member.get(member_property,
TRANSLATION_DEFAULTS.get(member_property)))
for hm_property in HEALTH_MONITOR_PROPERTIES:
trans_vip[
_create_key('hm', _trans_prop_name(hm_property))] = []
for hm in extended_vip['health_monitors']:
hm_pool = plugin.get_pool_health_monitor(context,
hm['id'],
extended_vip['pool']['id'])
if hm_pool['status'] != constants.PENDING_DELETE:
for hm_property in HEALTH_MONITOR_PROPERTIES:
value = hm.get(hm_property,
TRANSLATION_DEFAULTS.get(hm_property))
trans_vip[_create_key('hm',
_trans_prop_name(hm_property))].append(value)
ids = get_ids(extended_vip)
trans_vip['__ids__'] = ids
LOG.debug('Translated Vip graph: ' + str(trans_vip))
return trans_vip