¡@

Home 

OpenStack Study: api.py

OpenStack Index

**** CubicPower OpenStack Study ****

def _get_resource_model(resource):

    return resource_map[resource]

**** CubicPower OpenStack Study ****

def clear_db(base=model_base.BASEV2):

    db.clear_db(base)

**** CubicPower OpenStack Study ****

def get_ofc_item(session, resource, neutron_id):

    model = _get_resource_model(resource)

    if not model:

        return

    try:

        return session.query(model).filter_by(neutron_id=neutron_id).one()

    except sa.orm.exc.NoResultFound:

        return

**** CubicPower OpenStack Study ****

def get_ofc_id(session, resource, neutron_id):

    ofc_item = get_ofc_item(session, resource, neutron_id)

    if ofc_item:

        return ofc_item.ofc_id

    else:

        raise nexc.OFCMappingNotFound(resource=resource,

                                      neutron_id=neutron_id)

**** CubicPower OpenStack Study ****

def exists_ofc_item(session, resource, neutron_id):

    if get_ofc_item(session, resource, neutron_id):

        return True

    else:

        return False

**** CubicPower OpenStack Study ****

def find_ofc_item(session, resource, ofc_id):

    try:

        model = _get_resource_model(resource)

        params = dict(ofc_id=ofc_id)

        return (session.query(model).filter_by(**params).one())

    except sa.orm.exc.NoResultFound:

        return None

**** CubicPower OpenStack Study ****

def add_ofc_item(session, resource, neutron_id, ofc_id):

    try:

        model = _get_resource_model(resource)

        params = dict(neutron_id=neutron_id, ofc_id=ofc_id)

        item = model(**params)

        with session.begin(subtransactions=True):

            session.add(item)

            session.flush()

    except Exception as exc:

        LOG.exception(exc)

        raise nexc.NECDBException(reason=exc.message)

    return item

**** CubicPower OpenStack Study ****

def del_ofc_item(session, resource, neutron_id):

    try:

        model = _get_resource_model(resource)

        with session.begin(subtransactions=True):

            item = session.query(model).filter_by(neutron_id=neutron_id).one()

            session.delete(item)

        return True

    except sa.orm.exc.NoResultFound:

        LOG.warning(_("del_ofc_item(): NotFound item "

                      "(resource=%(resource)s, id=%(id)s) "),

                    {'resource': resource, 'id': neutron_id})

        return False

**** CubicPower OpenStack Study ****

def get_portinfo(session, id):

    try:

        return (session.query(nmodels.PortInfo).

                filter_by(id=id).

                one())

    except sa.orm.exc.NoResultFound:

        return None

**** CubicPower OpenStack Study ****

def add_portinfo(session, id, datapath_id='', port_no=0,

                 vlan_id=OFP_VLAN_NONE, mac=''):

    try:

        portinfo = nmodels.PortInfo(id=id, datapath_id=datapath_id,

                                    port_no=port_no, vlan_id=vlan_id, mac=mac)

        with session.begin(subtransactions=True):

            session.add(portinfo)

    except Exception as exc:

        LOG.exception(exc)

        raise nexc.NECDBException(reason=exc.message)

    return portinfo

**** CubicPower OpenStack Study ****

def del_portinfo(session, id):

    try:

        with session.begin(subtransactions=True):

            portinfo = session.query(nmodels.PortInfo).filter_by(id=id).one()

            session.delete(portinfo)

    except sa.orm.exc.NoResultFound:

        LOG.warning(_("del_portinfo(): NotFound portinfo for "

                      "port_id: %s"), id)

**** CubicPower OpenStack Study ****

def get_active_ports_on_ofc(context, network_id, port_id=None):

    """Retrieve ports on OFC on a given network.

    It returns a list of tuple (neutron port_id, OFC id).

    """

    query = context.session.query(nmodels.OFCPortMapping)

    query = query.join(models_v2.Port,

                       nmodels.OFCPortMapping.neutron_id == models_v2.Port.id)

    query = query.filter(models_v2.Port.network_id == network_id)

    if port_id:

        query = query.filter(nmodels.OFCPortMapping.neutron_id == port_id)

    return [(p['neutron_id'], p['ofc_id']) for p in query]

**** CubicPower OpenStack Study ****

def get_port_from_device(port_id):

    """Get port from database."""

    LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)

    session = db.get_session()

    sg_binding_port = sg_db.SecurityGroupPortBinding.port_id

    query = session.query(models_v2.Port,

                          sg_db.SecurityGroupPortBinding.security_group_id)

    query = query.outerjoin(sg_db.SecurityGroupPortBinding,

                            models_v2.Port.id == sg_binding_port)

    query = query.filter(models_v2.Port.id == port_id)

    port_and_sgs = query.all()

    if not port_and_sgs:

        return None

    port = port_and_sgs[0][0]

    plugin = manager.NeutronManager.get_plugin()

    port_dict = plugin._make_port_dict(port)

    port_dict[ext_sg.SECURITYGROUPS] = [

        sg_id for port_, sg_id in port_and_sgs if sg_id]

    port_dict['security_group_rules'] = []

    port_dict['security_group_source_groups'] = []

    port_dict['fixed_ips'] = [ip['ip_address']

                              for ip in port['fixed_ips']]

    return port_dict