• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testutils.delete_all_flows函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testutils.delete_all_flows函数的典型用法代码示例。如果您正苦于以下问题:Python delete_all_flows函数的具体用法?Python delete_all_flows怎么用?Python delete_all_flows使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了delete_all_flows函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: runTest

    def runTest(self):
        table_id = testutils.EX_ACL_TABLE
        port_in = exact_port_map.keys()[0]
        egr_port = exact_port_map.keys()[1]
        #"clear swtich;"
        testutils.delete_all_flows(self.controller, self.logger)
        #make packet;
        pkt = testutils.simple_tcp_packet(dl_src='00:01:02:03:04:05', dl_dst='00:06:07:08:09:0a')
        exp_pkt = testutils.simple_tcp_packet(dl_src='00:01:02:03:04:05', dl_dst='aa:aa:aa:aa:aa:aa')
        # get match list
        match_ls = testutils.packet_to_exact_flow_match(pkt = pkt, table_id = table_id, ing_port = port_in)

        act = action.action_set_field()
        field = match.eth_dst(parse.parse_mac("aa:aa:aa:aa:aa:aa"))
        act.field.add(field)
        exact_table_output(self, table_id, match_ls, actions = [act], egr_port = egr_port)

        testutils.do_barrier(self.controller)

        "send a packet from port_in "
        self.dataplane.send(port_in, str(pkt))
        "poll from the egr_port port"
        (port_rec, pkt_rec, _) = self.dataplane.poll(port_number=egr_port, timeout=1)

        self.assertTrue(pkt is not None,"rec none packets")
        self.assertEqual(str(exp_pkt), str(pkt_rec), 'retruned pkt not equal to the original pkt')
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:26,代码来源:exact_match.py


示例2: runTest

    def runTest(self):

        of_ports = ipv6_port_map.keys()
        of_ports.sort()
        ing_port = of_ports[0]
        egr_port = of_ports[2]
        table_id = testutils.WC_L3_TABLE

        # Remove all entries Add entry match all
        rc = testutils.delete_all_flows(self.controller, self.logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")

        rv = testutils.set_table_config(self, table_id)
        self.assertEqual(rv, 0, "Failed to set table config")

        # Add entry match
        pkt = testutils.simple_ipv6_packet()
        request = testutils.flow_msg_create(self, pkt, ing_port = ing_port, egr_port = egr_port, table_id = table_id)
        testutils.flow_msg_install(self, request)

        #Send packet
        self.logger.info("Sending IPv6 packet to " + str(ing_port))
        self.logger.debug("Data: " + str(pkt).encode('hex'))
        self.dataplane.send(ing_port, str(pkt))

        #Receive packet
        testutils.receive_pkt_verify(self, egr_port, pkt)

        #Remove flows
        rc = testutils.delete_all_flows(self.controller, self.logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:ipv6.py


示例3: runTest

 def runTest(self):
     ing_port = flow_mods_port_map.keys()[0]
     out_port1 = ofp.OFPP_ALL
     pkt = testutils.simple_tcp_packet()
     testutils.delete_all_flows(self.controller, self.logger)
     fm_orig = testutils.flow_msg_create(self, pkt,
                                         ing_port=ing_port,
                                         egr_port=out_port1)#flood
     inst = None
     inst = instruction.instruction_apply_actions()
     act = action.action_output()
     act.port = ofp.OFPP_IN_PORT #fwd to ingress
     rv = inst.actions.add(act)
     self.assertTrue(rv, "Could not add action" + act.show())
     fm_orig.instructions.add(inst)
     #print(fm_orig.show())
     testutils.ofmsg_send(self, fm_orig)
     (response, raw) = self.controller.poll(ofp.OFPT_ERROR, 2)
     self.assertTrue(response is None, 'Receive error message')
     #user sends packet
     self.dataplane.send(ing_port, str(pkt))
     testutils.do_barrier(self.controller)
     #verify pkt
     for of_port in flow_mods_port_map.keys():
         testutils.receive_pkt_verify(self, of_port, pkt)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:25,代码来源:flow_mods.py


示例4: runTest

 def runTest(self):
     ing_port = flow_mods_port_map.keys()[0]
     out_port1 = flow_mods_port_map.keys()[1]
     out_port2 = flow_mods_port_map.keys()[2]
     pkt = testutils.simple_tcp_packet()
     testutils.delete_all_flows(self.controller, self.logger)
     fm_orig = testutils.flow_msg_create(self, pkt, 
                                         ing_port=ing_port, 
                                         egr_port=out_port1)
     fm_new = testutils.flow_msg_create(self, pkt, 
                                         ing_port=ing_port, 
                                         egr_port=out_port2)
     fm_new.command = ofp.OFPFC_MODIFY_STRICT
     rv = self.controller.message_send(fm_orig)
     self.assertEqual(rv, 0, "Failed to insert 1st flow_mod")
     testutils.do_barrier(self.controller)
     rv = self.controller.message_send(fm_new)
     testutils.do_barrier(self.controller)
     self.assertEqual(rv, 0, "Failed to insert 2nd flow_mod")
     flow_stats = testutils.flow_stats_get(self)
     self.assertEqual(len(flow_stats.stats),1, 
                      "Expected only one flow_mod")
     stat = flow_stats.stats[0]
     self.assertEqual(stat.match, fm_new.match)
     self.assertEqual(stat.instructions, fm_new.instructions)
开发者ID:CPqD,项目名称:oftest12,代码行数:25,代码来源:flow_mods.py


示例5: runTest

    def runTest(self):
        of_ports = ipv6_port_map.keys()
        of_ports.sort()
        ing_port = of_ports[0]
        egr_port = of_ports[3]
        
        # Remove all entries Add entry match all
        rc = testutils.delete_all_flows(self.controller, ipv6_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")

        # Add entry match 

        request = message.flow_mod()
        request.match.type = ofp.OFPMT_OXM
        port = match.in_port(of_ports[0])
        eth_type = match.eth_type(IPV6_ETHERTYPE)
        ipv6_src = match.ipv6_src(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:5189'))
        
        request.match_fields.tlvs.append(port)
        request.match_fields.tlvs.append(eth_type)
        request.match_fields.tlvs.append(ipv6_src)
        
        field_2b_set = match.ipv6_dst(ipaddr.IPv6Address('fe80::2420:52ff:fe8f:DDDD'))
        act_setfield = action.action_set_field()
        act_setfield.field = field_2b_set
        
#       TODO: insert action set field properly
        act_out = action.action_output()
        act_out.port = of_ports[3]
        
        
        inst = instruction.instruction_apply_actions()
        inst.actions.add(act_setfield)
        inst.actions.add(act_out)
        request.instructions.add(inst)
        request.buffer_id = 0xffffffff
        
        request.priority = 1000
        ipv6_logger.debug("Adding flow ")
        
        rv = self.controller.message_send(request)
        self.assertTrue(rv != -1, "Failed to insert test flow")

        #Send packet
        pkt = testutils.simple_ipv6_packet(ip_src='fe80::2420:52ff:fe8f:5189',ip_dst='fe80::2420:52ff:fe8f:5190')
        ipv6_logger.info("Sending IPv6 packet to " + str(ing_port))
        ipv6_logger.debug("Data: " + str(pkt).encode('hex'))
        self.dataplane.send(ing_port, str(pkt))
        
        #Receive packet
        exp_pkt = testutils.simple_ipv6_packet(ip_dst='fe80::2420:52ff:fe8f:DDDD')
        testutils.receive_pkt_verify(self, egr_port, exp_pkt)

        #See flow match
        response = testutils.flow_stats_get(self)
        ipv6_logger.debug("Response" + response.show())
        
        #Remove flows
        rc = testutils.delete_all_flows(self.controller, ipv6_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")    
开发者ID:CPqD,项目名称:oftest12,代码行数:60,代码来源:ipv6.py


示例6: runTest

    def runTest(self):
        for of_port in sdn_port_map.keys():
            ing_port = of_port
            for egr_port in sdn_port_map.keys():
                if egr_port != of_port:
                    break
            #egr_port = (i+1) %4 + 1
            #print("\ncount: " + str(ing_port))
            #print("erg_port : " + str(egr_port) + " ing_port: " + str(ing_port))
            self.logger.info("\ncount: " + str(ing_port))
            self.logger.info("erg_port : " + str(egr_port) + " ing_port: " + str(ing_port))
            pkt = testutils.simple_tcp_packet()
            testutils.delete_all_flows(self.controller, self.logger)

            request = testutils.flow_msg_create(self, pkt, ing_port = ing_port, egr_port = egr_port,check_expire=True)
            request.cookie = random.randint(0,9007199254740992)
            request.buffer_id = 0xffffffff
            request.hard_timeout = 1000
            request.idle_timeout = 1000

            rv = self.controller.message_send(request)
            self.assertTrue(rv != -1, "Error installing flow mod")
            testutils.do_barrier(self.controller)

            self.dataplane.send(ing_port, str(pkt))
            (rcv_port, rcv_pkt, _) = self.dataplane.poll(timeout=1)
            #print("erg_port : " + str(rcv_port) + " pkt: %s" % str(rcv_pkt).encode("hex") )
            #print("\n")
            self.logger.info("erg_port : " + str(rcv_port) + " pkt: %s" % str(rcv_pkt).encode("hex") )
            self.logger.info("\n")
            self.assertTrue(rcv_pkt is not None, "Did not receive packet")
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:sdn.py


示例7: runTest

 def runTest(self):
     basic_logger.info("Running TableStats")
     testutils.delete_all_flows(self.controller, self.logger)
     basic_logger.info("Sending table stats request")
     request = message.table_stats_request()
     response, _ = self.controller.transact(request, timeout=2)
 
     # delete everything, so there should be no entries
     self.assertEqual(self.get_first_table_active_entries(response), 0)
     # add two entries to first table
     m1 = testutils.match_all_generate()
     m1.dl_type = 0x800
     m1.wildcards ^= ofp.OFPFW_DL_TYPE
     fm1 = testutils.flow_msg_create(self, None, match=m1, egr_port=2)
     rv = self.controller.message_send(fm1)
     self.assertEqual(rv, 0)
     m2 = testutils.match_all_generate()
     m2.dl_type = 0x806
     m2.wildcards ^= ofp.OFPFW_DL_TYPE
     fm2 = testutils.flow_msg_create(self, None, match=m2, egr_port=2)
     rv = self.controller.message_send(fm2)
     self.assertEqual(rv, 0)
     testutils.do_barrier(self.controller)
     response, _ = self.controller.transact(request, timeout=2)
     self.assertEqual(self.get_first_table_active_entries(response), 2)
开发者ID:TrafficLab,项目名称:oftest11,代码行数:25,代码来源:stats.py


示例8: runTest

    def runTest(self):
        of_ports = pa_port_map.keys()
        ing_port = of_ports[1]
        egr_port =ofp.OFPP_CONTROLLER

        "clear swtich;"
        testutils.delete_all_flows(self.controller, self.logger)
        # Set table config as "continue"
        testutils.set_table_config(self, testutils.WC_ACL_TABLE, ofp.OFPTC_TABLE_MISS_CONTINUE, True)
        testutils.set_table_config(self, testutils.WC_SERV_TABLE, ofp.OFPTC_TABLE_MISS_DROP, True)
        testutils.set_table_config(self, testutils.EX_L2_TABLE, ofp.OFPTC_TABLE_MISS_CONTROLLER, True)

        #"make test packet;"
        pkt = testutils.simple_icmp_packet()

        inst_write_metadata = instruction.instruction_write_metadata();
        inst_write_metadata.metadata = 0x20000000
        inst_write_metadata.metadata_mask = 0xf0000000

        testutils.write_goto(self, testutils.WC_SERV_TABLE, testutils.EX_L2_TABLE, ing_port, add_inst = inst_write_metadata)

        pkt_metadata = {'metadata_val':inst_write_metadata.metadata, 'metadata_msk':inst_write_metadata.metadata_mask}
        match_fields = testutils.packet_to_exact_flow_match(pkt_metadata = pkt_metadata, table_id = testutils.EX_L2_TABLE)
        testutils.write_output(self, testutils.EX_L2_TABLE, egr_port, match_fields=match_fields)
        
        self.dataplane.send(ing_port, str(pkt))
        (response, _) = self.controller.poll(ofp.OFPT_PACKET_IN, 2)
        self.assertTrue(response is not None, 'Packet in message not received on port ' + str(egr_port))
        if str(pkt) != response.data:
               pa_logger.debug("pkt  len " + str(len(str(pkt))) +": " + str(pkt))
               pa_logger.debug("resp len " + str(len(str(response.data))) + ": " + str(response.data))
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:multi-table.py


示例9: runTest

    def runTest(self):
        #testutils.clear_switch(self,pa_port_map,pa_logger)
        testutils.delete_all_flows(self.controller, pa_logger)
        of_ports = pa_port_map.keys()
        of_ports.sort()
        self.assertTrue(len(of_ports) > 1, "Not enough ports for test")

        pkt  = testutils.simple_tcp_packet()
        #match_ls = packet_to_exact_flow_match(pkt = pkt, table_id = 0, in_port = of_ports[0])
        match_ls = parse.packet_to_flow_match(pkt)

        request = message.flow_mod()
        request.match_fields = match_ls
        request.flags = 0x18#set flags OFPFF_NO_PKT_COUNTS and OFPFF_NO_BYT_COUNTS
        
        action_list = []        
        act = action.action_output()
        act.port = of_ports[1]
        action_list.append(act)
        
        inst = None
        instruction_list = []
        
        if len(instruction_list) == 0: 
              inst = instruction.instruction_apply_actions()
              instruction_list.append(inst)
        
        for act in action_list:
              rv = inst.actions.add(act)
              self.assertTrue(rv, "Could not add action" + act.show())

        for i in instruction_list: 
              rv = request.instructions.add(i)
              self.assertTrue(rv, "Could not add instruction " + i.show())

        #testutils.message_send(self, request)
        rv0 = self.controller.message_send(request)
        self.assertTrue( rv0 != -1, "Error sending flow mod")
        testutils.do_barrier(self.controller)

        #self.send_data(pkt, of_ports[0])
        self.dataplane.send(of_ports[0],str(pkt))

        aggregate_stats_req = message.aggregate_stats_request()
        aggregate_stats_req.match_fields = match_ls
        
        rv = self.controller.message_send(aggregate_stats_req)
        self.assertTrue( rv != -1, "Error sending flow stat req")
        #testutils.message_send(self, aggregate_stats_req)

        (response, _) = self.controller.poll(ofp.OFPT_MULTIPART_REPLY, 2)
        #print(response.show())
        self.assertTrue(response is not None, "No aggregate_stats reply")
        self.assertEqual(len(response.stats), 1, "Did not receive aggregate stats reply")
        self.assertEqual(response.stats[0].packet_count,0)#1)
        self.assertEqual(response.stats[0].byte_count,0)#len(packet_in))
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:56,代码来源:flow_stats.py


示例10: runTest

    def runTest(self):
        rc = testutils.delete_all_flows(self.controller, pa_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")
        rc = nxm_delete_all_flows()
        self.assertEqual(rc, 0, "Failed to delete all flows")

        pkt = testutils.simple_tcp_packet()
        testutils.flow_match_test(self, pa_port_map, pkt=pkt,  wildcards=ofp.OFPFW_ALL, max_test =1)

        rc = testutils.delete_all_flows(self.controller, pa_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:rrdenicol,项目名称:oftest11,代码行数:11,代码来源:ipv6.py


示例11: runTest

    def runTest(self):
        
        of_ports = pa_port_map.keys()
        of_ports.sort()
        ing_port = of_ports[0]
        egr_port =   of_ports[2]
        
        self.of_dir = os.path.normpath("../../of11softswitch")
        self.ofd = os.path.normpath(self.of_dir + "/udatapath/ofdatapath")
        self.dpctl = os.path.normpath(self.of_dir + "/utilities/dpctl")
        dpctl_switch = "unix:/tmp/ofd"

        # Remove all entries Add entry match all
        # sudo ./dpctl unix:/tmp/ofd flow-mod cmd=del,table=0
#        flow_cmd1 = "flow-mod"
#        flow_cmd2 = "cmd=del,table=0"
#        pcall = [self.dpctl, dpctl_switch, flow_cmd1, flow_cmd2]
#        subprocess.call(pcall)  

        rc = testutils.delete_all_flows(self.controller, pa_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")

        # Add entry match all
        flow_cmd1 = "flow-mod"
        flow_cmd2 = "cmd=add,table=0,idle=100"
        flow_match = "wildcards=+all,dl_src_mask=FF:FF:FF:FF:FF:FF,dl_dst_mask=FF:FF:FF:FF:FF:FF,nw_src_mask=255.255.255.255,nw_dst_mask=255.255.255.255" #, -in_port,in_port=1,dl_type=2048in_port=0  wildcards=0xffff
        flow_acts = "apply:output=" + str(egr_port)

        pcall = [self.dpctl, dpctl_switch, flow_cmd1, flow_cmd2, flow_match,  flow_acts]
        print pcall
        subprocess.call(pcall)

        #Send packet
        pkt = testutils.simple_tcp_packet()
        pa_logger.info("Sending IPv4 packet to " + str(ing_port))
        pa_logger.debug("Data: " + str(pkt).encode('hex'))
        self.dataplane.send(ing_port, str(pkt))
        
        #Receive packet
        exp_pkt = testutils.simple_tcp_packet()
        testutils.receive_pkt_verify(self, egr_port, exp_pkt)

        #See flow match
        pa_logger.debug("Request stats-flow")  
        pcall = [self.dpctl, dpctl_switch, "stats-flow"]  #  
        subprocess.call(pcall)
        
        #Remove flows
        rc = testutils.delete_all_flows(self.controller, pa_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")
开发者ID:chesteve,项目名称:oftest11,代码行数:50,代码来源:ipv6.py


示例12: handleFlow

    def handleFlow(self, pkttype='TCP'):
        of_ports = pa_port_map.keys()
        of_ports.sort()
        self.assertTrue(len(of_ports) > 1, "Not enough ports for test")

        if (pkttype == 'ICMP'):
                pkt = testutils.simple_icmp_packet()
                table_id = testutils.EX_ICMP_TABLE
        else:
                pkt = testutils.simple_tcp_packet()
                table_id = testutils.WC_ACL_TABLE
        
        for idx in range(len(of_ports)):
            rv = testutils.delete_all_flows(self.controller, pa_logger)
            self.assertEqual(rv, 0, "Failed to delete all flows")
            testutils.set_table_config(self, table_id, ofp.OFPTC_TABLE_MISS_CONTINUE)

            ingress_port = of_ports[idx]
            egress_port = of_ports[(idx + 1) % len(of_ports)]
            pa_logger.info("Ingress " + str(ingress_port) +
                             " to egress " + str(egress_port))
            
            #controller send flow mod to switch
            request = testutils.flow_msg_create(self,pkt, ing_port=ingress_port, 
                                                egr_port=egress_port, table_id=table_id)
            testutils.flow_msg_install(self, request)
            
            #user send pkt to switch
            pa_logger.info("Sending packet to dp port " + str(ingress_port))
            self.dataplane.send(ingress_port, str(pkt))
            testutils.receive_pkt_verify(self, egress_port, pkt)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:pktact.py


示例13: runTest

    def runTest(self):
        of_ports = pa_port_map.keys()
        of_ports.sort()
        self.assertTrue(len(of_ports) > 1, "Not enough ports for test")

        pkt = testutils.simple_tcp_packet()
        
        act_all = action.action_output()
        act_all.port = ofp.OFPP_ALL
        act_ing = action.action_output()
        act_ing.port = ofp.OFPP_IN_PORT
        actions = [ act_all, act_ing]

        for ingress_port in of_ports:
            rv = testutils.delete_all_flows(self.controller, pa_logger)
            self.assertEqual(rv, 0, "Failed to delete all flows")

            pa_logger.info("Ingress " + str(ingress_port) + " to all ports")
        
            flow_mod = testutils.flow_msg_create(self, pkt, 
                                                 ing_port=ingress_port, 
                                                 action_list=actions, 
                                                 wildcards=~ofp.OFPFW_IN_PORT)
            flow_mod.buffer_id = 0xffffffff
            pa_logger.info(flow_mod.show())

            pa_logger.info("Inserting flow")
            rv = self.controller.message_send(flow_mod)
            self.assertTrue(rv != -1, "Error installing flow mod")
            testutils.do_barrier(self.controller)

            pa_logger.info("Sending packet to dp port " + str(ingress_port))
            self.dataplane.send(ingress_port, str(pkt))
            testutils.receive_pkt_check(self.dataplane, pkt, of_ports, [], self,
                              pa_logger)
开发者ID:TrafficLab,项目名称:oftest11,代码行数:35,代码来源:pktact.py


示例14: runTest

    def runTest(self):
        # Construct packet to send to dataplane
        # Send packet to dataplane, once to each port
        # Poll controller with expect message type packet in

        rc = testutils.delete_all_flows(self.controller, config_logger)
        self.assertEqual(rc, 0, "Failed to delete all flows")

        of_port=config_port_map.keys()[0]
        rv = testutils.port_config_set(self.controller, of_port,
                         ofp.OFPPC_NO_PACKET_IN, ofp.OFPPC_NO_PACKET_IN,
                         config_logger)
        self.assertTrue(rv != -1, "Error sending port mod")
        config_logger.info("NO PKT IN test, port " + str(of_port))

        # make sure config is changed before sending the packet
        testutils.do_barrier(self.controller);


        pkt = testutils.simple_tcp_packet()
        self.dataplane.send(of_port, str(pkt))
        #@todo Check for unexpected messages?
        (response, _) = self.controller.poll(ofp.OFPT_PACKET_IN, 2)

        self.assertTrue(response is None, 
                        'OFPPC_NO_PACKET_IN flag is ignored on port (got a packet when we asked not to) ' + 
                        str(of_port))
开发者ID:TrafficLab,项目名称:oftest11,代码行数:27,代码来源:config.py


示例15: runTest

    def runTest(self):
        of_ports = pa_port_map.keys()
        of_ports.sort()
        self.assertTrue(len(of_ports) > 2, "Not enough ports for test")

        pkt = testutils.simple_tcp_packet()

        for ingress_port in of_ports:
            rv = testutils.delete_all_flows(self.controller, pa_logger)
            self.assertEqual(rv, 0, "Failed to delete all flows")

            pa_logger.info("Ingress " + str(ingress_port) +
                           " all non-ingress ports")
            actions = []
            for egress_port in of_ports:
                act = action.action_output()
                if egress_port == ingress_port:
                    continue
                act.port = egress_port
                actions.append(act)
            request = testutils.flow_msg_create(self, pkt, ing_port=ingress_port,
                                action_list=actions, inst_app_flag=testutils.APPLY_ACTIONS_INSTRUCTION)

            pa_logger.info("Inserting flow")
            testutils.ofmsg_send(self, request)

            pa_logger.info("Sending packet to dp port " + str(ingress_port))
            self.dataplane.send(ingress_port, str(pkt))
            yes_ports = set(of_ports).difference([ingress_port])
            testutils.receive_pkt_check(self.dataplane, pkt, yes_ports, [ingress_port],
                              self, pa_logger)
开发者ID:HuaweiSwitch,项目名称:OpenFlow,代码行数:31,代码来源:pktact.py



注:本文中的testutils.delete_all_flows函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testutils.flow_msg_create函数代码示例发布时间:2022-05-27
下一篇:
Python testutils.client_from_wsdl函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap