**** CubicPower OpenStack Study ****
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import uuid
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db.loadbalancer import loadbalancer_db
from neutron.extensions import lbaas_agentscheduler
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
LOG = logging.getLogger(__name__)
AGENT_SCHEDULER_OPTS = [
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
default='neutron.services.loadbalancer.agent_scheduler'
'.ChanceScheduler',
help=_('Driver to use for scheduling '
'pool to a default loadbalancer agent')),
]
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
**** CubicPower OpenStack Study ****
class DriverNotSpecified(n_exc.NeutronException):
message = _("Device driver for agent should be specified "
"in plugin driver.")
**** CubicPower OpenStack Study ****
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed;
# - pool_deployed() and update_status() methods added;
**** CubicPower OpenStack Study ****
def __init__(self, plugin):
self.plugin = plugin
**** CubicPower OpenStack Study ****
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher(
[self, agents_db.AgentExtRpcCallback(self.plugin)])
**** CubicPower OpenStack Study ****
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]})
if not agents:
return []
elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']]
qry = context.session.query(loadbalancer_db.Pool.id)
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
qry = qry.filter(
loadbalancer_db.Pool.status.in_(
constants.ACTIVE_PENDING_STATUSES))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
**** CubicPower OpenStack Study ****
def get_logical_device(self, context, pool_id=None):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
if pool.vip:
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if (
m.status in constants.ACTIVE_PENDING_STATUSES or
m.status == constants.INACTIVE)
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.healthmonitor)
for hm in pool.monitors
if hm.status in constants.ACTIVE_PENDING_STATUSES
]
retval['driver'] = (
self.plugin.drivers[pool.provider.provider_name].device_driver)
return retval
**** CubicPower OpenStack Study ****
def pool_deployed(self, context, pool_id):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
# set all resources to active
if pool.status in constants.ACTIVE_PENDING_STATUSES:
pool.status = constants.ACTIVE
if (pool.vip and pool.vip.status in
constants.ACTIVE_PENDING_STATUSES):
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in constants.ACTIVE_PENDING_STATUSES:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.status in constants.ACTIVE_PENDING_STATUSES:
hm.status = constants.ACTIVE
**** CubicPower OpenStack Study ****
def update_status(self, context, obj_type, obj_id, status):
model_mapping = {
'pool': loadbalancer_db.Pool,
'vip': loadbalancer_db.Vip,
'member': loadbalancer_db.Member,
'health_monitor': loadbalancer_db.PoolMonitorAssociation
}
if obj_type not in model_mapping:
raise n_exc.Invalid(_('Unknown object type: %s') % obj_type)
try:
if obj_type == 'health_monitor':
self.plugin.update_pool_health_monitor(
context, obj_id['monitor_id'], obj_id['pool_id'], status)
else:
self.plugin.update_status(
context, model_mapping[obj_type], obj_id, status)
except n_exc.NotFound:
# update_status may come from agent on an object which was
# already deleted from db with other request
LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
'not found in the DB, it was probably deleted '
'concurrently'),
{'obj_type': obj_type, 'obj_id': obj_id})
**** CubicPower OpenStack Study ****
def pool_destroyed(self, context, pool_id=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
**** CubicPower OpenStack Study ****
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except n_exc.PortNotFound:
msg = _('Unable to find port %s to plug.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = True
port['device_owner'] = 'neutron:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
port[portbindings.HOST_ID] = host
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
**** CubicPower OpenStack Study ****
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except n_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except n_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
**** CubicPower OpenStack Study ****
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
self.plugin.update_pool_stats(context, pool_id, data=stats)
**** CubicPower OpenStack Study ****
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
BASE_RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
**** CubicPower OpenStack Study ****
def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION)
**** CubicPower OpenStack Study ****
def _cast(self, context, method_name, method_args, host, version=None):
return self.cast(
context,
self.make_msg(method_name, **method_args),
topic='%s.%s' % (self.topic, host),
version=version
)
**** CubicPower OpenStack Study ****
def create_vip(self, context, vip, host):
return self._cast(context, 'create_vip', {'vip': vip}, host)
**** CubicPower OpenStack Study ****
def update_vip(self, context, old_vip, vip, host):
return self._cast(context, 'update_vip',
{'old_vip': old_vip, 'vip': vip}, host)
**** CubicPower OpenStack Study ****
def delete_vip(self, context, vip, host):
return self._cast(context, 'delete_vip', {'vip': vip}, host)
**** CubicPower OpenStack Study ****
def create_pool(self, context, pool, host, driver_name):
return self._cast(context, 'create_pool',
{'pool': pool, 'driver_name': driver_name}, host)
**** CubicPower OpenStack Study ****
def update_pool(self, context, old_pool, pool, host):
return self._cast(context, 'update_pool',
{'old_pool': old_pool, 'pool': pool}, host)
**** CubicPower OpenStack Study ****
def delete_pool(self, context, pool, host):
return self._cast(context, 'delete_pool', {'pool': pool}, host)
**** CubicPower OpenStack Study ****
def create_member(self, context, member, host):
return self._cast(context, 'create_member', {'member': member}, host)
**** CubicPower OpenStack Study ****
def update_member(self, context, old_member, member, host):
return self._cast(context, 'update_member',
{'old_member': old_member, 'member': member}, host)
**** CubicPower OpenStack Study ****
def delete_member(self, context, member, host):
return self._cast(context, 'delete_member', {'member': member}, host)
**** CubicPower OpenStack Study ****
def create_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'create_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
**** CubicPower OpenStack Study ****
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id, host):
return self._cast(context, 'update_pool_health_monitor',
{'old_health_monitor': old_health_monitor,
'health_monitor': health_monitor,
'pool_id': pool_id}, host)
**** CubicPower OpenStack Study ****
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'delete_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
**** CubicPower OpenStack Study ****
def agent_updated(self, context, admin_state_up, host):
return self._cast(context, 'agent_updated',
{'payload': {'admin_state_up': admin_state_up}},
host)
**** CubicPower OpenStack Study ****
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
**** CubicPower OpenStack Study ****
def __init__(self, plugin):
if not self.device_driver:
raise DriverNotSpecified()
self.agent_rpc = LoadBalancerAgentApi(topics.LOADBALANCER_AGENT)
self.plugin = plugin
self._set_callbacks_on_plugin()
self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver)
**** CubicPower OpenStack Study ****
def _set_callbacks_on_plugin(self):
# other agent based plugin driver might already set callbacks on plugin
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.conn = rpc.create_connection(new=True)
self.plugin.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
fanout=False)
self.plugin.conn.consume_in_thread()
**** CubicPower OpenStack Study ****
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent:
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
return agent['agent']
**** CubicPower OpenStack Study ****
def create_vip(self, context, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.create_vip(context, vip, agent['host'])
**** CubicPower OpenStack Study ****
def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in constants.ACTIVE_PENDING_STATUSES:
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
else:
self.agent_rpc.delete_vip(context, vip, agent['host'])
**** CubicPower OpenStack Study ****
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.delete_vip(context, vip, agent['host'])
**** CubicPower OpenStack Study ****
def create_pool(self, context, pool):
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
self.device_driver)
if not agent:
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
self.agent_rpc.create_pool(context, pool, agent['host'],
self.device_driver)
**** CubicPower OpenStack Study ****
def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in constants.ACTIVE_PENDING_STATUSES:
self.agent_rpc.update_pool(context, old_pool, pool,
agent['host'])
else:
self.agent_rpc.delete_pool(context, pool, agent['host'])
**** CubicPower OpenStack Study ****
def delete_pool(self, context, pool):
# get agent first to know host as binding will be deleted
# after pool is deleted from db
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
self.plugin._delete_db_pool(context, pool['id'])
if agent:
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
**** CubicPower OpenStack Study ****
def create_member(self, context, member):
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.create_member(context, member, agent['host'])
**** CubicPower OpenStack Study ****
def update_member(self, context, old_member, member):
agent = self.get_pool_agent(context, member['pool_id'])
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id'])
if old_pool_agent:
self.agent_rpc.delete_member(context, old_member,
old_pool_agent['agent']['host'])
self.agent_rpc.create_member(context, member, agent['host'])
else:
self.agent_rpc.update_member(context, old_member, member,
agent['host'])
**** CubicPower OpenStack Study ****
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.delete_member(context, member, agent['host'])
**** CubicPower OpenStack Study ****
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.create_pool_health_monitor(context, healthmon,
pool_id, agent['host'])
**** CubicPower OpenStack Study ****
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
health_monitor, pool_id,
agent['host'])
**** CubicPower OpenStack Study ****
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id
)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
pool_id, agent['host'])
**** CubicPower OpenStack Study ****
def stats(self, context, pool_id):
pass