• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api.port_find函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中quark.db.api.port_find函数的典型用法代码示例。如果您正苦于以下问题:Python port_find函数的具体用法?Python port_find怎么用?Python port_find使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了port_find函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: create_ip_address

    def create_ip_address(self, context, ip_address):
        LOG.info("create_ip_address for tenant %s" % context.tenant_id)

        port = None
        ip_dict = ip_address["ip_address"]
        port_id = ip_dict.get('port_id')
        network_id = ip_dict.get('network_id')
        device_id = ip_dict.get('device_id')
        ip_version = ip_dict.get('version')
        ip_address = ip_dict.get('ip_address')

        if network_id and device_id:
            port = db_api.port_find(
                context, network_id=network_id, device_id=device_id,
                tenant_id=context.tenant_id, scope=db_api.ONE)
        elif port_id:
            port = db_api.port_find(context, id=port_id, scope=db_api.ONE)

        if not port:
            raise exceptions.PortNotFound(port_id=port_id,
                                          net_id=network_id)
        address = self.ipam_driver.allocate_ip_address(
            context,
            port['network_id'],
            port['id'],
            self.ipam_reuse_after,
            ip_version,
            ip_address)
        port["ip_addresses"].append(address)
        return self._make_ip_dict(address)
开发者ID:ugoring,项目名称:quark,代码行数:30,代码来源:plugin.py


示例2: test_port_list_device_owner_found_returns_only_those

 def test_port_list_device_owner_found_returns_only_those(self):
     # create a network
     network = dict(name="public", tenant_id="fake", network_plugin="BASE")
     net_mod = db_api.network_create(self.context, **network)
     # create ports
     port1 = dict(network_id=net_mod["id"], backend_key="1", device_id="1",
                  device_owner="Doge")
     port2 = dict(network_id=net_mod["id"], backend_key="1", device_id="1",
                  device_owner=port1["device_owner"])
     port3 = dict(network_id=net_mod["id"], backend_key="1", device_id="1",
                  device_owner="network:dhcp")
     port_mod1 = db_api.port_create(self.context, **port1)
     port_mod2 = db_api.port_create(self.context, **port2)
     port_mod3 = db_api.port_create(self.context, **port3)
     res = db_api.port_find(self.context, scope=db_api.ALL,
                            device_owner=port3["device_owner"])
     self.assertTrue(len(res) == 1)
     self.assertTrue(res[0]["device_owner"] == port3["device_owner"])
     res = db_api.port_find(self.context, scope=db_api.ALL,
                            device_owner=port1["device_owner"])
     self.assertTrue(len(res) == 2)
     self.assertTrue(res[0]["device_owner"] == res[1]["device_owner"] ==
                     port1["device_owner"])
     db_api.network_delete(self.context, net_mod)
     db_api.port_delete(self.context, port_mod1)
     db_api.port_delete(self.context, port_mod2)
     db_api.port_delete(self.context, port_mod3)
开发者ID:anilkumarkodi,项目名称:quark,代码行数:27,代码来源:test_ports.py


示例3: diagnose_port

def diagnose_port(context, id, fields):
    if id == "*":
        return {"ports": [_diag_port(context, port, fields) for port in db_api.port_find(context).all()]}
    db_port = db_api.port_find(context, id=id, scope=db_api.ONE)
    if not db_port:
        raise exceptions.PortNotFound(port_id=id, net_id="")
    port = _diag_port(context, db_port, fields)
    return {"ports": port}
开发者ID:blamarvt,项目名称:quark,代码行数:8,代码来源:ports.py


示例4: diagnose_port

def diagnose_port(context, id, fields):
    if not context.is_admin:
        raise exceptions.NotAuthorized()

    if id == "*":
        return {'ports': [_diag_port(context, port, fields) for
                port in db_api.port_find(context).all()]}
    db_port = db_api.port_find(context, id=id, scope=db_api.ONE)
    if not db_port:
        raise exceptions.PortNotFound(port_id=id, net_id='')
    port = _diag_port(context, db_port, fields)
    return {'ports': port}
开发者ID:insequent,项目名称:quark,代码行数:12,代码来源:ports.py


示例5: update_ip_address

def update_ip_address(context, id, ip_address):
    LOG.info("update_ip_address %s for tenant %s" %
            (id, context.tenant_id))

    address = db_api.ip_address_find(
        context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)

    if not address:
        raise exceptions.NotFound(
            message="No IP address found with id=%s" % id)

    old_ports = address['ports']
    port_ids = ip_address['ip_address'].get('port_ids')
    if port_ids is None:
        return v._make_ip_dict(address)

    for port in old_ports:
        port['ip_addresses'].remove(address)

    if port_ids:
        ports = db_api.port_find(
            context, tenant_id=context.tenant_id, id=port_ids,
            scope=db_api.ALL)

        # NOTE: could be considered inefficient because we're converting
        #       to a list to check length. Maybe revisit
        if len(ports) != len(port_ids):
            raise exceptions.NotFound(
                message="No ports not found with ids=%s" % port_ids)
        for port in ports:
            port['ip_addresses'].extend([address])
    else:
        address["deallocated"] = 1

    return v._make_ip_dict(address)
开发者ID:kilogram,项目名称:quark,代码行数:35,代码来源:ip_addresses.py


示例6: get_ports

def get_ports(context, filters=None, fields=None):
    """Retrieve a list of ports.

    The contents of the list depends on the identity of the user
    making the request (as indicated by the context) as well as any
    filters.
    : param context: neutron api request context
    : param filters: a dictionary with keys that are valid keys for
        a port as listed in the RESOURCE_ATTRIBUTE_MAP object
        in neutron/api/v2/attributes.py.  Values in this dictiontary
        are an iterable containing values that will be used for an exact
        match comparison for that value.  Each result returned by this
        function will have matched one of the values for each key in
        filters.
    : param fields: a list of strings that are valid keys in a
        port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
        object in neutron/api/v2/attributes.py. Only these fields
        will be returned.
    """
    LOG.info("get_ports for tenant %s filters %s fields %s" %
            (context.tenant_id, filters, fields))
    if filters is None:
        filters = {}
    query = db_api.port_find(context, fields=fields, **filters)
    return v._make_ports_list(query, fields)
开发者ID:jkoelker,项目名称:quark,代码行数:25,代码来源:ports.py


示例7: update_port_for_ip_address

def update_port_for_ip_address(context, ip_id, id, port):
    """Update values of a port.

    : param context: neutron api request context
    : param ip_id: UUID representing the ip associated with port to update
    : param id: UUID representing the port to update.
    : param port: dictionary with keys indicating fields to update.
        valid keys are those that have a value of True for 'allow_put'
        as listed in the RESOURCE_ATTRIBUTE_MAP object in
        neutron/api/v2/attributes.py.
    """
    LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
    sanitize_list = ['service']
    with context.session.begin():
        addr = db_api.ip_address_find(context, id=ip_id, scope=db_api.ONE)
        if not addr:
            raise q_exc.IpAddressNotFound(addr_id=ip_id)
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise q_exc.PortNotFound(port_id=id)
        port_dict = {k: port['port'][k] for k in sanitize_list}

        require_da = False
        service = port_dict.get('service')

        if require_da and _shared_ip_and_active(addr, except_port=id):
            raise q_exc.PortRequiresDisassociation()
        addr.set_service_for_port(port_db, service)
        context.session.add(addr)
    return v._make_port_for_ip_dict(addr, port_db)
开发者ID:roaet,项目名称:quark,代码行数:30,代码来源:ip_addresses.py


示例8: update_ports_for_sg

    def update_ports_for_sg(self, context, portid, jobid):
        """Updates the ports through redis."""
        port = db_api.port_find(context, id=portid, scope=db_api.ONE)
        if not port:
            LOG.warning("Port not found")
            return
        net_driver = port_api._get_net_driver(port.network, port=port)
        base_net_driver = port_api._get_net_driver(port.network)
        sg_list = [sg for sg in port.security_groups]

        success = False
        error = None
        retries = 3
        retry_delay = 2
        for retry in xrange(retries):
            try:
                net_driver.update_port(context, port_id=port["backend_key"],
                                       mac_address=port["mac_address"],
                                       device_id=port["device_id"],
                                       base_net_driver=base_net_driver,
                                       security_groups=sg_list)
                success = True
                error = None
                break
            except Exception as error:
                LOG.warning("Could not connect to redis, but retrying soon")
                time.sleep(retry_delay)
        status_str = ""
        if not success:
            status_str = "Port %s update failed after %d tries. Error: %s" % (
                portid, retries, error)
        update_body = dict(completed=True, status=status_str)
        update_body = dict(job=update_body)
        job_api.update_job(context.elevated(), jobid, update_body)
开发者ID:openstack,项目名称:quark,代码行数:34,代码来源:sg_update_worker.py


示例9: delete_port

def delete_port(context, id):
    """Delete a port.

    : param context: neutron api request context
    : param id: UUID representing the port to delete.
    """
    LOG.info("delete_port %s for tenant %s" % (id, context.tenant_id))

    port = db_api.port_find(context, id=id, scope=db_api.ONE)
    if not port:
        raise n_exc.PortNotFound(port_id=id)

    if 'device_id' in port:  # false is weird, but ignore that
        LOG.info("delete_port %s for tenant %s has device %s" %
                 (id, context.tenant_id, port['device_id']))

    backend_key = port["backend_key"]
    mac_address = netaddr.EUI(port["mac_address"]).value
    ipam_driver = _get_ipam_driver(port["network"], port=port)
    ipam_driver.deallocate_mac_address(context, mac_address)
    ipam_driver.deallocate_ips_by_port(
        context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)

    net_driver = _get_net_driver(port["network"], port=port)
    base_net_driver = _get_net_driver(port["network"])
    net_driver.delete_port(context, backend_key, device_id=port["device_id"],
                           mac_address=port["mac_address"],
                           base_net_driver=base_net_driver)

    with context.session.begin():
        db_api.port_delete(context, port)
开发者ID:Anonymike,项目名称:quark,代码行数:31,代码来源:ports.py


示例10: test_create_shared_ips_with_port_ids

    def test_create_shared_ips_with_port_ids(self):

        def _make_body(ip):
            fix_ip = dict(ip_address=ip, subnet_id=sub['id'])
            port_info = {"port": dict(fixed_ips=[fix_ip])}
            return port_info

        with self._stubs(self.network, self.subnet, self.ports_info2) as (
                net, sub, ports):
            for p in ports:
                port_db = db_api.port_find(self.context, id=p['id'],
                                           scope=db_api.ONE)
                assocs = db_api.ip_port_association_find(self.context,
                                                         scope=db_api.ALL,
                                                         port_id=p['id'])
                self.assertEqual(1, len(p.get('fixed_ips')))
                self.assertEqual(1, len(port_db.ip_addresses))
                ip_db = port_db.ip_addresses[0]
                self.assertEqual('none', ip_db.get_service_for_port(port_db))
                self.assertEqual(1, len(assocs))

            port_ids = [ports[0]['id'], ports[1]['id']]
            shared_ip = {'ip_address': dict(port_ids=port_ids,
                                            network_id=net['id'],
                                            version=4)}
            ip = ip_api.create_ip_address(self.context, shared_ip)
            self.assertEqual(ip_types.SHARED, ip['type'])

            ports_ip = ip_api.get_ports_for_ip_address(self.context, ip['id'])
            self.assertEqual(2, len(ports_ip))
开发者ID:Anonymike,项目名称:quark,代码行数:30,代码来源:test_ip_addresses.py


示例11: delete_port

def delete_port(context, id):
    """Delete a port.

    : param context: neutron api request context
    : param id: UUID representing the port to delete.
    """
    LOG.info("delete_port %s for tenant %s" %
             (id, context.tenant_id))

    port = db_api.port_find(context, id=id, scope=db_api.ONE)
    if not port:
        raise exceptions.PortNotFound(net_id=id)

    backend_key = port["backend_key"]
    mac_address = netaddr.EUI(port["mac_address"]).value
    ipam_driver = ipam.IPAM_REGISTRY.get_strategy(
        port["network"]["ipam_strategy"])
    ipam_driver.deallocate_mac_address(context, mac_address)
    ipam_driver.deallocate_ips_by_port(
        context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)

    net_driver = registry.DRIVER_REGISTRY.get_driver(
        port.network["network_plugin"])
    net_driver.delete_port(context, backend_key, device_id=port["device_id"],
                           mac_address=port["mac_address"])

    with context.session.begin():
        db_api.port_delete(context, port)
开发者ID:insequent,项目名称:quark,代码行数:28,代码来源:ports.py


示例12: disassociate_port

def disassociate_port(context, id, ip_address_id):
    """Disassociates a port from an IP address.

    : param context: neutron api request context
    : param id: UUID representing the port to disassociate.
    : param ip_address_id: UUID representing the IP address to
    disassociate.
    """
    LOG.info("disassociate_port %s for tenant %s ip_address_id %s" %
             (id, context.tenant_id, ip_address_id))
    with context.session.begin():
        port = db_api.port_find(context, id=id, ip_address_id=[ip_address_id],
                                scope=db_api.ONE)

        if not port:
            raise exceptions.PortNotFound(port_id=id, net_id='')

        the_address = [address for address in port["ip_addresses"]
                       if address["id"] == ip_address_id][0]
        port["ip_addresses"] = [address for address in port["ip_addresses"]
                                if address.id != ip_address_id]

        if len(the_address["ports"]) == 0:
            the_address["deallocated"] = 1
    return v._make_port_dict(port)
开发者ID:mohanraj1311,项目名称:quark,代码行数:25,代码来源:ports.py


示例13: create_ip_address

def create_ip_address(context, ip_address):
    LOG.info("create_ip_address for tenant %s" % context.tenant_id)

    port = None
    ip_dict = ip_address["ip_address"]
    port_ids = ip_dict.get('port_ids')
    network_id = ip_dict.get('network_id')
    device_ids = ip_dict.get('device_ids')
    ip_version = ip_dict.get('version')
    ip_address = ip_dict.get('ip_address')

    ports = []
    if device_ids and not network_id:
        raise exceptions.BadRequest(
            resource="ip_addresses",
            msg="network_id is required if device_ids are supplied.")
    with context.session.begin():
        if network_id and device_ids:
            for device_id in device_ids:
                port = db_api.port_find(
                    context, network_id=network_id, device_id=device_id,
                    tenant_id=context.tenant_id, scope=db_api.ONE)
                ports.append(port)
        elif port_ids:
            for port_id in port_ids:
                port = db_api.port_find(context, id=port_id,
                                        tenant_id=context.tenant_id,
                                        scope=db_api.ONE)
                ports.append(port)

        if not ports:
            raise exceptions.PortNotFound(port_id=port_ids,
                                          net_id=network_id)

        address = ipam_driver.allocate_ip_address(
            context,
            port['network_id'],
            port['id'],
            CONF.QUARK.ipam_reuse_after,
            ip_version,
            ip_addresses=[ip_address])

        for port in ports:
            port["ip_addresses"].append(address)

    return v._make_ip_dict(address)
开发者ID:sumanthns,项目名称:quark,代码行数:46,代码来源:ip_addresses.py


示例14: _get_port

def _get_port(context, port_id):
    port = db_api.port_find(context, id=port_id, scope=db_api.ONE)
    if not port:
        raise n_exc.PortNotFound(port_id=port_id)

    if not port.ip_addresses or len(port.ip_addresses) == 0:
        raise q_exc.NoAvailableFixedIpsForPort(port_id=port_id)
    return port
开发者ID:roaet,项目名称:quark,代码行数:8,代码来源:floating_ips.py


示例15: update_port

def update_port(context, id, port):
    """Update values of a port.

    : param context: neutron api request context
    : param id: UUID representing the port to update.
    : param port: dictionary with keys indicating fields to update.
        valid keys are those that have a value of True for 'allow_put'
        as listed in the RESOURCE_ATTRIBUTE_MAP object in
        neutron/api/v2/attributes.py.
    """
    LOG.info("update_port %s for tenant %s" % (id, context.tenant_id))
    with context.session.begin():
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise exceptions.PortNotFound(port_id=id)

        address_pairs = []
        fixed_ips = port["port"].pop("fixed_ips", None)
        if fixed_ips:
            ipam_driver = ipam.IPAM_REGISTRY.get_strategy(
                port_db["network"]["ipam_strategy"])
            ipam_driver.deallocate_ip_address(
                context, port_db, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
            addresses = []
            for fixed_ip in fixed_ips:
                subnet_id = fixed_ip.get("subnet_id")
                ip_address = fixed_ip.get("ip_address")
                if not (subnet_id and ip_address):
                    raise exceptions.BadRequest(
                        resource="fixed_ips",
                        msg="subnet_id and ip_address required")
                # Note: we don't allow overlapping subnets, thus subnet_id is
                #       ignored.
                addresses.append(ipam_driver.allocate_ip_address(
                    context, port_db["network_id"], id,
                    CONF.QUARK.ipam_reuse_after, ip_address=ip_address))
            port["port"]["addresses"] = addresses
            mac_address_string = str(netaddr.EUI(port_db.mac_address,
                                                 dialect=netaddr.mac_unix))
            address_pairs = [{'mac_address': mac_address_string,
                              'ip_address':
                              address.get('address_readable', '')}
                             for address in addresses]

        group_ids, security_groups = v.make_security_group_list(
            context, port["port"].pop("security_groups", None))
        net_driver = registry.DRIVER_REGISTRY.get_driver(
            port_db.network["network_plugin"])
        net_driver.update_port(context, port_id=port_db.backend_key,
                               security_groups=group_ids,
                               allowed_pairs=address_pairs)

        port["port"]["security_groups"] = security_groups
        port = db_api.port_update(context, port_db, **port["port"])
    return v._make_port_dict(port)
开发者ID:jkoelker,项目名称:quark,代码行数:55,代码来源:ports.py


示例16: post_update_port

    def post_update_port(self, context, id, port):
        LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise exceptions.PortNotFound(port_id=id, net_id="")

        if "port" not in port or not port["port"]:
            raise exceptions.BadRequest()
        port = port["port"]

        if "fixed_ips" in port and port["fixed_ips"]:
            for ip in port["fixed_ips"]:
                address = None
                if ip:
                    if "ip_id" in ip:
                        ip_id = ip["ip_id"]
                        address = db_api.ip_address_find(
                            context,
                            id=ip_id,
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE)
                    elif "ip_address" in ip:
                        ip_address = ip["ip_address"]
                        net_address = netaddr.IPAddress(ip_address)
                        address = db_api.ip_address_find(
                            context,
                            ip_address=net_address,
                            network_id=port_db["network_id"],
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE)
                        if not address:
                            address = self.ipam_driver.allocate_ip_address(
                                context,
                                port_db["network_id"],
                                id,
                                self.ipam_reuse_after,
                                ip_address=ip_address)
                else:
                    address = self.ipam_driver.allocate_ip_address(
                        context,
                        port_db["network_id"],
                        id,
                        self.ipam_reuse_after)

            address["deallocated"] = 0

            already_contained = False
            for port_address in port_db["ip_addresses"]:
                if address["id"] == port_address["id"]:
                    already_contained = True
                    break

            if not already_contained:
                port_db["ip_addresses"].append(address)
        return self._make_port_dict(port_db)
开发者ID:ugoring,项目名称:quark,代码行数:55,代码来源:plugin.py


示例17: post_update_port

def post_update_port(context, id, port):
    LOG.info("post_update_port %s for tenant %s" % (id, context.tenant_id))
    if not port.get("port"):
        raise exceptions.BadRequest(resource="ports", msg="Port body required")

    with context.session.begin():
        port_db = db_api.port_find(context, id=id, scope=db_api.ONE)
        if not port_db:
            raise exceptions.PortNotFound(port_id=id, net_id="")

        port = port["port"]
        if "fixed_ips" in port and port["fixed_ips"]:
            for ip in port["fixed_ips"]:
                address = None
                ipam_driver = ipam.IPAM_REGISTRY.get_strategy(port_db["network"]["ipam_strategy"])
                if ip:
                    if "ip_id" in ip:
                        ip_id = ip["ip_id"]
                        address = db_api.ip_address_find(
                            context, id=ip_id, tenant_id=context.tenant_id, scope=db_api.ONE
                        )
                    elif "ip_address" in ip:
                        ip_address = ip["ip_address"]
                        net_address = netaddr.IPAddress(ip_address)
                        address = db_api.ip_address_find(
                            context,
                            ip_address=net_address,
                            network_id=port_db["network_id"],
                            tenant_id=context.tenant_id,
                            scope=db_api.ONE,
                        )
                        if not address:
                            address = ipam_driver.allocate_ip_address(
                                context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after, ip_address=ip_address
                            )
                else:
                    address = ipam_driver.allocate_ip_address(
                        context, port_db["network_id"], id, CONF.QUARK.ipam_reuse_after
                    )

            address["deallocated"] = 0

            already_contained = False
            for port_address in port_db["ip_addresses"]:
                if address["id"] == port_address["id"]:
                    already_contained = True
                    break

            if not already_contained:
                port_db["ip_addresses"].append(address)
    return v._make_port_dict(port_db)
开发者ID:blamarvt,项目名称:quark,代码行数:51,代码来源:ports.py


示例18: update_ip_address

def update_ip_address(context, id, ip_address):
    LOG.info("update_ip_address %s for tenant %s" %
             (id, context.tenant_id))
    ports = []
    with context.session.begin():
        address = db_api.ip_address_find(
            context, id=id, tenant_id=context.tenant_id, scope=db_api.ONE)
        if not address:
            raise exceptions.NotFound(
                message="No IP address found with id=%s" % id)

        reset = ip_address['ip_address'].get('reset_allocation_time', False)
        if reset and address['deallocated'] == 1:
            if context.is_admin:
                LOG.info("IP's deallocated time being manually reset")
                address['deallocated_at'] = _get_deallocated_override()
            else:
                msg = "Modification of reset_allocation_time requires admin"
                raise webob.exc.HTTPForbidden(detail=msg)

        port_ids = ip_address['ip_address'].get('port_ids')

        if port_ids:
            _raise_if_shared_and_enabled(ip_address, address)
            ports = db_api.port_find(context, tenant_id=context.tenant_id,
                                     id=port_ids, scope=db_api.ALL)
            # NOTE(name): could be considered inefficient because we're
            # converting to a list to check length. Maybe revisit
            if len(ports) != len(port_ids):
                raise exceptions.NotFound(
                    message="No ports not found with ids=%s" % port_ids)

            validate_ports_on_network_and_same_segment(ports,
                                                       address["network_id"])
            validate_port_ip_quotas(context, ports)

            LOG.info("Updating IP address, %s, to only be used by the"
                     "following ports:  %s" % (address.address_readable,
                                               [p.id for p in ports]))
            new_address = db_api.update_port_associations_for_ip(context,
                                                                 ports,
                                                                 address)
        else:
            if port_ids is not None:
                ipam_driver.deallocate_ip_address(
                    context, address)
            return v._make_ip_dict(address)
    return v._make_ip_dict(new_address)
开发者ID:evanscottgray,项目名称:quark,代码行数:48,代码来源:ip_addresses.py


示例19: test_ports_sorted_by_created_at

 def test_ports_sorted_by_created_at(self):
     # create a network
     network = dict(name="public", tenant_id="fake", network_plugin="BASE")
     net_mod = db_api.network_create(self.context, **network)
     # create ports
     port1 = dict(network_id=net_mod["id"], backend_key="1", device_id="1")
     port2 = dict(network_id=net_mod["id"], backend_key="1", device_id="1")
     port3 = dict(network_id=net_mod["id"], backend_key="1", device_id="1")
     port_mod1 = db_api.port_create(self.context, **port1)
     port_mod2 = db_api.port_create(self.context, **port2)
     port_mod3 = db_api.port_create(self.context, **port3)
     res = db_api.port_find(self.context, scope=db_api.ALL)
     self.assertTrue(res[0]["created_at"] < res[1]["created_at"] < res[2]["created_at"])
     db_api.network_delete(self.context, net_mod)
     db_api.port_delete(self.context, port_mod1)
     db_api.port_delete(self.context, port_mod2)
     db_api.port_delete(self.context, port_mod3)
开发者ID:blamarvt,项目名称:quark,代码行数:17,代码来源:test_ports.py


示例20: get_port

def get_port(context, id, fields=None):
    """Retrieve a port.

    : param context: neutron api request context
    : param id: UUID representing the port to fetch.
    : param fields: a list of strings that are valid keys in a
        port dictionary as listed in the RESOURCE_ATTRIBUTE_MAP
        object in neutron/api/v2/attributes.py. Only these fields
        will be returned.
    """
    LOG.info("get_port %s for tenant %s fields %s" % (id, context.tenant_id, fields))
    results = db_api.port_find(context, id=id, fields=fields, scope=db_api.ONE)

    if not results:
        raise exceptions.PortNotFound(port_id=id, net_id="")

    return v._make_port_dict(results)
开发者ID:blamarvt,项目名称:quark,代码行数:17,代码来源:ports.py



注:本文中的quark.db.api.port_find函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.security_group_find函数代码示例发布时间:2022-05-26
下一篇:
Python api.network_find函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap