• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python net_helpers.get_free_namespace_port函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.tests.common.net_helpers.get_free_namespace_port函数的典型用法代码示例。如果您正苦于以下问题:Python get_free_namespace_port函数的具体用法?Python get_free_namespace_port怎么用?Python get_free_namespace_port使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_free_namespace_port函数的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_fip_connection_from_same_subnet

    def test_fip_connection_from_same_subnet(self):
        '''Test connection to floatingip which is associated with
           fixed_ip on the same subnet of the source fixed_ip.
           In other words it confirms that return packets surely
           go through the router.
        '''
        router_info = self.generate_router_info(enable_ha=False)
        router = self.manage_router(self.agent, router_info)
        router_ip_cidr = self._port_first_ip_cidr(router.internal_ports[0])
        router_ip = router_ip_cidr.partition('/')[0]

        br_int = framework.get_ovs_bridge(
            self.agent.conf.ovs_integration_bridge)

        src_machine, dst_machine = self.useFixture(
            machine_fixtures.PeerMachines(
                br_int,
                net_helpers.increment_ip_cidr(router_ip_cidr),
                router_ip)).machines

        dst_fip = '19.4.4.10'
        router.router[l3_constants.FLOATINGIP_KEY] = []
        self._add_fip(router, dst_fip, fixed_address=dst_machine.ip)
        router.process(self.agent)

        protocol_port = net_helpers.get_free_namespace_port(
            l3_constants.PROTO_NAME_TCP, dst_machine.namespace)
        # client sends to fip
        netcat = net_helpers.NetcatTester(
            src_machine.namespace, dst_machine.namespace,
            dst_fip, protocol_port,
            protocol=net_helpers.NetcatTester.TCP)
        self.addCleanup(netcat.stop_processes)
        self.assertTrue(netcat.test_connectivity())
开发者ID:tidatida,项目名称:neutron,代码行数:34,代码来源:test_legacy_router.py


示例2: _create_nc_tester

    def _create_nc_tester(self, direction, protocol, src_port, dst_port):
        """Create netcat tester

        If there already exists a netcat tester that has established
        connection, exception is raised.
        """
        nc_key = (direction, protocol, src_port, dst_port)
        nc_tester = self._nc_testers.get(nc_key)
        if nc_tester and nc_tester.is_established:
            raise ConnectionTesterException(
                '%s connection using %s protocol, source port %s and '
                'destination port %s is already established' % (
                    direction, protocol, src_port, dst_port))

        if direction == self.INGRESS:
            client_ns = self.peer_namespace
            server_ns = self.vm_namespace
            server_addr = self.vm_ip_address
        else:
            client_ns = self.vm_namespace
            server_ns = self.peer_namespace
            server_addr = self.peer_ip_address

        server_port = dst_port or net_helpers.get_free_namespace_port(
            protocol, server_ns)
        nc_tester = net_helpers.NetcatTester(client_namespace=client_ns,
                                             server_namespace=server_ns,
                                             address=server_addr,
                                             protocol=protocol,
                                             src_port=src_port,
                                             dst_port=server_port)
        self._nc_testers[nc_key] = nc_tester
        return nc_tester
开发者ID:sebrandon1,项目名称:neutron,代码行数:33,代码来源:conn_testers.py


示例3: _generate_port

def _generate_port():
    """Get a free TCP port from the Operating System and return it.

    This might fail if some other process occupies this port after this
    function finished but before the neutron-server process started.
    """
    return str(net_helpers.get_free_namespace_port(
        constants.PROTO_NAME_TCP))
开发者ID:abhilabh,项目名称:neutron,代码行数:8,代码来源:config.py


示例4: setUp

    def setUp(self):
        super(IptablesManagerTestCase, self).setUp()

        bridge = self.useFixture(net_helpers.VethBridgeFixture()).bridge
        self.client, self.server = self.useFixture(machine_fixtures.PeerMachines(bridge)).machines

        self.client_fw, self.server_fw = self.create_firewalls()
        # The port is used in isolated namespace that precludes possibility of
        # port conflicts
        self.port = net_helpers.get_free_namespace_port(constants.PROTO_NAME_TCP, self.server.namespace)
开发者ID:electrocucaracha,项目名称:neutron,代码行数:10,代码来源:test_iptables.py


示例5: test_get_free_namespace_port

    def test_get_free_namespace_port(self):
        ss_output2 = ss_output
        for p in range(1024, 32767):
            ss_output2 += ss_output_template % p

        with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') \
            as ipwrapper:
            m = mock.MagicMock()
            m.netns.execute.return_value = ss_output2
            ipwrapper.return_value = m
            result = net_helpers.get_free_namespace_port(
                n_const.PROTO_NAME_TCP)
            self.assertEqual(32767, result)
开发者ID:muraliran,项目名称:neutron,代码行数:13,代码来源:test_net_helpers.py


示例6: test_fip_connection_from_same_subnet

 def test_fip_connection_from_same_subnet(self):
     """Test connection to floatingip which is associated with
        fixed_ip on the same subnet of the source fixed_ip.
        In other words it confirms that return packets surely
        go through the router.
     """
     src_machine, dst_machine, dst_fip = self._setup_fip_with_fixed_ip_from_same_subnet(enable_snat=True)
     protocol_port = net_helpers.get_free_namespace_port(l3_constants.PROTO_NAME_TCP, dst_machine.namespace)
     # client sends to fip
     netcat = net_helpers.NetcatTester(
         src_machine.namespace, dst_machine.namespace, dst_fip, protocol_port, protocol=net_helpers.NetcatTester.TCP
     )
     self.addCleanup(netcat.stop_processes)
     self.assertTrue(netcat.test_connectivity())
开发者ID:electrocucaracha,项目名称:neutron,代码行数:14,代码来源:test_legacy_router.py


示例7: test_get_free_namespace_port

    def test_get_free_namespace_port(self):
        ss_output2 = ss_output
        for p in range(1024, 32767):
            ss_output2 += ss_output_template % p

        with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') \
            as ipwrapper, \
            mock.patch('neutron.agent.linux.utils.execute') as ex:
            m = mock.MagicMock()
            m.netns.execute.return_value = ss_output2
            ipwrapper.return_value = m
            local_port_range_start = 32768
            ex.return_value = "%s\t61000" % local_port_range_start
            result = net_helpers.get_free_namespace_port(
                n_const.PROTO_NAME_TCP)
            self.assertEqual((local_port_range_start - 1), result)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:16,代码来源:test_net_helpers.py


示例8: _test_conntrack_disassociate_fip

    def _test_conntrack_disassociate_fip(self, ha):
        '''Test that conntrack immediately drops stateful connection
           that uses floating IP once it's disassociated.
        '''
        router_info = self.generate_router_info(enable_ha=ha)
        router = self.manage_router(self.agent, router_info)

        port = net_helpers.get_free_namespace_port(
            constants.PROTO_NAME_TCP, router.ns_name)
        client_address = '19.4.4.3'
        server_address = '35.4.0.4'

        def clean_fips(router):
            router.router[constants.FLOATINGIP_KEY] = []

        clean_fips(router)
        self._add_fip(router, client_address, fixed_address=server_address)
        router.process()

        router_ns = ip_lib.IPWrapper(namespace=router.ns_name)
        netcat = net_helpers.NetcatTester(
            router.ns_name, router.ns_name, client_address, port,
            protocol=net_helpers.NetcatTester.TCP)
        self.addCleanup(netcat.stop_processes)

        def assert_num_of_conntrack_rules(n):
            out = router_ns.netns.execute(["conntrack", "-L",
                                           "--orig-src", client_address])
            self.assertEqual(
                n, len([line for line in out.strip().split('\n') if line]))

        if ha:
            common_utils.wait_until_true(lambda: router.ha_state == 'master')

        with self.assert_max_execution_time(100):
            assert_num_of_conntrack_rules(0)

            self.assertTrue(netcat.test_connectivity())
            assert_num_of_conntrack_rules(1)

            clean_fips(router)
            router.process()
            assert_num_of_conntrack_rules(0)

            with testtools.ExpectedException(RuntimeError):
                netcat.test_connectivity()
开发者ID:openstack,项目名称:neutron,代码行数:46,代码来源:framework.py


示例9: setUp

    def setUp(self):
        super(OVSAgentTestBase, self).setUp()
        self.br = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
        self.of_interface_mod = importutils.import_module(self.main_module)
        self.br_int_cls = None
        self.br_tun_cls = None
        self.br_phys_cls = None
        self.br_int = None
        self.init_done = False
        self.init_done_ev = eventlet.event.Event()
        self.main_ev = eventlet.event.Event()
        self.addCleanup(self._kill_main)
        retry_count = 3
        while True:
            cfg.CONF.set_override('of_listen_port',
                                  net_helpers.get_free_namespace_port(
                                      n_const.PROTO_NAME_TCP),
                                  group='OVS')
            self.of_interface_mod.init_config()
            self._main_thread = eventlet.spawn(self._kick_main)

            # Wait for _kick_main -> of_interface main -> _agent_main
            # NOTE(yamamoto): This complexity came from how "native"
            # of_interface runs its openflow controller.  "native"
            # of_interface's main routine blocks while running the
            # embedded openflow controller.  In that case, the agent
            # rpc_loop runs in another thread.  However, for FT we
            # need to run setUp() and test_xxx() in the same thread.
            # So I made this run of_interface's main in a separate
            # thread instead.
            try:
                while not self.init_done:
                    self.init_done_ev.wait()
                break
            except fixtures.TimeoutException:
                self._kill_main()
            retry_count -= 1
            if retry_count < 0:
                raise Exception('port allocation failed')
开发者ID:FedericoRessi,项目名称:neutron,代码行数:39,代码来源:test_ovs_flows.py



注:本文中的neutron.tests.common.net_helpers.get_free_namespace_port函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python net_helpers.increment_ip_cidr函数代码示例发布时间:2022-05-27
下一篇:
Python net_helpers.assert_ping函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap