**** CubicPower OpenStack Study ****
def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Lists a nexusport binding."""
LOG.debug(_("get_nexusport_binding() called"))
return _lookup_all_nexus_bindings(port_id=port_id,
vlan_id=vlan_id,
switch_ip=switch_ip,
instance_id=instance_id)
**** CubicPower OpenStack Study ****
def get_nexusvlan_binding(vlan_id, switch_ip):
"""Lists a vlan and switch binding."""
LOG.debug(_("get_nexusvlan_binding() called"))
return _lookup_all_nexus_bindings(vlan_id=vlan_id, switch_ip=switch_ip)
**** CubicPower OpenStack Study ****
def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Adds a nexusport binding."""
LOG.debug(_("add_nexusport_binding() called"))
session = db.get_session()
binding = nexus_models_v2.NexusPortBinding(port_id=port_id,
vlan_id=vlan_id,
switch_ip=switch_ip,
instance_id=instance_id)
session.add(binding)
session.flush()
return binding
**** CubicPower OpenStack Study ****
def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Removes a nexusport binding."""
LOG.debug(_("remove_nexusport_binding() called"))
session = db.get_session()
binding = _lookup_all_nexus_bindings(session=session,
vlan_id=vlan_id,
switch_ip=switch_ip,
port_id=port_id,
instance_id=instance_id)
for bind in binding:
session.delete(bind)
session.flush()
return binding
**** CubicPower OpenStack Study ****
def update_nexusport_binding(port_id, new_vlan_id):
"""Updates nexusport binding."""
if not new_vlan_id:
LOG.warning(_("update_nexusport_binding called with no vlan"))
return
LOG.debug(_("update_nexusport_binding called"))
session = db.get_session()
binding = _lookup_one_nexus_binding(session=session, port_id=port_id)
binding.vlan_id = new_vlan_id
session.merge(binding)
session.flush()
return binding
**** CubicPower OpenStack Study ****
def get_nexusvm_binding(vlan_id, instance_id):
"""Lists nexusvm bindings."""
LOG.debug(_("get_nexusvm_binding() called"))
return _lookup_first_nexus_binding(instance_id=instance_id,
vlan_id=vlan_id)
**** CubicPower OpenStack Study ****
def get_port_vlan_switch_binding(port_id, vlan_id, switch_ip):
"""Lists nexusvm bindings."""
LOG.debug(_("get_port_vlan_switch_binding() called"))
return _lookup_all_nexus_bindings(port_id=port_id,
switch_ip=switch_ip,
vlan_id=vlan_id)
**** CubicPower OpenStack Study ****
def get_port_switch_bindings(port_id, switch_ip):
"""List all vm/vlan bindings on a Nexus switch port."""
LOG.debug(_("get_port_switch_bindings() called, "
"port:'%(port_id)s', switch:'%(switch_ip)s'"),
{'port_id': port_id, 'switch_ip': switch_ip})
try:
return _lookup_all_nexus_bindings(port_id=port_id,
switch_ip=switch_ip)
except c_exc.NexusPortBindingNotFound:
pass
**** CubicPower OpenStack Study ****
def get_nexussvi_bindings():
"""Lists nexus svi bindings."""
LOG.debug(_("get_nexussvi_bindings() called"))
return _lookup_all_nexus_bindings(port_id='router')
**** CubicPower OpenStack Study ****
def _lookup_nexus_bindings(query_type, session=None, **bfilter):
"""Look up 'query_type' Nexus bindings matching the filter.
:param query_type: 'all', 'one' or 'first'
:param session: db session
:param bfilter: filter for bindings query
:return: bindings if query gave a result, else
raise NexusPortBindingNotFound.
"""
if session is None:
session = db.get_session()
query_method = getattr(session.query(
nexus_models_v2.NexusPortBinding).filter_by(**bfilter), query_type)
try:
bindings = query_method()
if bindings:
return bindings
except sa_exc.NoResultFound:
pass
raise c_exc.NexusPortBindingNotFound(**bfilter)
**** CubicPower OpenStack Study ****
def _lookup_all_nexus_bindings(session=None, **bfilter):
return _lookup_nexus_bindings('all', session, **bfilter)
**** CubicPower OpenStack Study ****
def _lookup_one_nexus_binding(session=None, **bfilter):
return _lookup_nexus_bindings('one', session, **bfilter)
**** CubicPower OpenStack Study ****
def _lookup_first_nexus_binding(session=None, **bfilter):
return _lookup_nexus_bindings('first', session, **bfilter)