• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python asserts.should_NOT_receive函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mdts.tests.utils.asserts.should_NOT_receive函数的典型用法代码示例。如果您正苦于以下问题:Python should_NOT_receive函数的具体用法?Python should_NOT_receive怎么用?Python should_NOT_receive使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了should_NOT_receive函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_dst_mac_masking

def test_dst_mac_masking():
    """
    Title: Test destination MAC masking in chain rules

    Scenario 1:
    When: There's a rule dropping any traffic with the multicast bit on
    Then: Multicast traffic is blocked and unicast traffic goes through

    Scenario 2:
    When: There's a rule dropping any traffic with the multicast bit off
    Then: Multicast traffic goes through and unicast traffic is blocked
    """

    bridge = VTM.get_bridge('bridge-000-001')

    if1 = BM.get_iface_for_port('bridge-000-001', 1)
    if2 = BM.get_iface_for_port('bridge-000-001', 2)

    if1_hw_addr = if1.get_mac_addr()  # interface['hw_addr']
    if2_hw_addr = if2.get_mac_addr()  # interface['hw_addr']

    if2_ip_addr = if2.get_ip()

    rcv_filter = 'udp and ether src %s' % if1_hw_addr

    bridge.set_inbound_filter(VTM.get_chain('drop_multicast'))
    # Send a frame to an arbitrary multicast address. Bridge doesn't
    # recognize it and will try to flood it to the other port, but the
    # masked MAC rule should drop it since it has the multicast bit set.
    f1 = async_assert_that(if2, should_NOT_receive(rcv_filter, within_sec(10)))
    f2 = if1.send_udp("01:23:45:67:89:ab", if2_ip_addr)
    wait_on_futures([f1, f2])

    # If2's actual MAC address should work, since it doesn't have the bit set.
    f1 = async_assert_that(if2, receives(rcv_filter, within_sec(10)))
    f2 = if1.send_udp(if2_hw_addr, if2_ip_addr)
    wait_on_futures([f1, f2])

    # Change to the chain that allows only multicast addresses.
    bridge.set_inbound_filter(VTM.get_chain('allow_only_multicast'))

    # Send another frame to the multicast address. Bridge doesn't
    # recognize it and will try to flood it to the other port. This
    # time the rule should allow it through.
    f1 = async_assert_that(if2, receives(rcv_filter, within_sec(10)))
    f2 = if1.send_udp("01:23:45:67:89:ab", if2_ip_addr)
    wait_on_futures([f1, f2])

    # If2's actual MAC address should be blocked, since it doesn't
    # have the multicast bit set.
    f1 = async_assert_that(if2, should_NOT_receive(rcv_filter, within_sec(10)))
    f2 = if1.send_udp(if2_hw_addr, if2_ip_addr)
    wait_on_futures([f1, f2])
开发者ID:yamt,项目名称:midonet,代码行数:53,代码来源:test_chains.py


示例2: test_floating_ip

def test_floating_ip():
    """
    Title: Tests a floating IP.

    Scenario 1:
    When: a VM sends an ICMP echo request to a floating IP address
          (100.100.100.100).
    Then: the router performs DNAT on the message according to the rule chain
          set to the router,
    And: the receiver VM should receive the ICMP echo packet,
    And: the receiver sends back an ICMP reply with its original IP address
         as a source address.
    And: the router applies SNAT to the reply packet.
    And: the sender receives the reply with src address NATed to the floating IP
         address.
    """
    sender = BM.get_iface_for_port("bridge-000-001", 2)
    receiver = BM.get_iface_for_port("bridge-000-002", 2)
    # Reset in-/out-bound filters.
    unset_filters("router-000-001")
    feed_receiver_mac(receiver)

    f1 = async_assert_that(receiver, should_NOT_receive("dst host 172.16.2.1 and icmp", within_sec(10)))
    sender.ping_ipv4_addr("100.100.100.100")
    wait_on_futures([f1])

    # Configure floating IP address with the router
    set_filters("router-000-001", "pre_filter_floating_ip", "post_filter_floating_ip")

    f1 = async_assert_that(receiver, receives("dst host 172.16.2.1 and icmp", within_sec(10)))
    f2 = async_assert_that(sender, receives("src host 100.100.100.100 and icmp", within_sec(10)))
    sender.ping_ipv4_addr("100.100.100.100")
    wait_on_futures([f1, f2])
开发者ID:itsuugo,项目名称:midonet,代码行数:33,代码来源:test_nat_router.py


示例3: test_filter_ipv6

def test_filter_ipv6():
    """
    Title: Filter IPv6 packets out on Bridge

    Scenario 1:
    When: there is no filter settings
    Then: IPv6 packets go through the bridge

    Scenario 2:
    When: the bridge has a chain in which there is a drop rule for IPv6
    Then: IPv6 packets should not go through the bridge

    Scenario 3:
    When: the chain is removed from the bridge
    Then: IPv6 packets should go through again.
    """

    iface1 = BM.get_iface_for_port('bridge-000-001', 1)
    iface2 = BM.get_iface_for_port('bridge-000-001', 2)

    iface1_hw_addr = iface1.interface['hw_addr']
    iface2_hw_addr = iface2.interface['hw_addr']

    ipv6_proto = "86:dd"
    ipv6_icmp = ("60:00:00:00:00:20:3a:ff:fe:80:00:00:00:00:00:00:1a:03:73:ff:"
                 "fe:29:a9:b1:ff:02:00:00:00:00:00:00:00:00:00:01:ff:29:a9:b2:"
                 "87:00:32:26:00:00:00:00:fe:80:00:00:00:00:00:00:1a:03:73:ff:"
                 "fe:29:a9:b2:01:01:18:03:73:29:a9:b1")

    packet = '%s-%s-%s-%s' % (iface2_hw_addr, iface1_hw_addr, ipv6_proto,
                              ipv6_icmp)

    rcv_filter = 'ether dst %s' % iface2_hw_addr

    # Sceneario 1:
    f1 = async_assert_that(iface2, receives(rcv_filter, within_sec(10)))
    # async_assert_that expects only 1 packet. Send only one, because the next
    # tcpdump might capture it (and fail the test) in case it takes some time
    # to arrive.
    # FIXME: make the tcpdump listener configurable
    f2 = iface1.send_ether(packet, count=1)
    wait_on_futures([f1, f2])

    # Scenario 2:
    # setting chain and make sure it's dropped
    chain = VTM.get_chain('drop_ipv6')
    VTM.get_bridge('bridge-000-001').set_inbound_filter(chain)

    f1 = async_assert_that(iface2,
                           should_NOT_receive(
                               rcv_filter, within_sec(10)))
    f2 = iface1.send_ether(packet, count=1)
    wait_on_futures([f1, f2])

    # Remove the filter and verify that packets go through again.
    VTM.get_bridge('bridge-000-001').set_inbound_filter(None)
    time.sleep(1)
    f1 = async_assert_that(iface2, receives(rcv_filter, within_sec(10)))
    f2 = iface1.send_ether(packet, count=1)
    wait_on_futures([f1, f2])
开发者ID:yamt,项目名称:midonet,代码行数:60,代码来源:test_chains.py


示例4: test_filtering_by_dl

def test_filtering_by_dl():
    '''
    Title: Tests dl-based packet filtering.

    Scenario:
    When: A VM sends UDP packets to another host on the same bridge.
    Then: The UDP packets reach the receiver without filtering rule chains.
    Then: A filtering rule chain based on mac address is set on the bridge.
    And: UDP packets from the same host do NOT reach the same destination host.
    '''
    outside = BM.get_iface_for_port('bridge-000-001', 2)
    inside = BM.get_iface_for_port('bridge-000-001', 3)

    # Reset an in-bound filter. 
    unset_bridge_port_filters('bridge-000-001', 3)

    port_num = get_random_port_num()
    f1 = outside.send_udp('aa:bb:cc:00:01:02', '172.16.1.2', 41,
                          src_port=port_num, dst_port=port_num)
    assert_that(inside, receives('dst host 172.16.1.2 and udp', within_sec(5)),
                'No filtering: inside receives UDP packets from outside.')
    wait_on_futures([f1])

    # Set a filtering rule based on mac addresses
    set_bridge_port_filters('bridge-000-001', 3, 'connection_tracking_dl_in',
                            'connection_tracking_dl_out')

    f1 = outside.send_udp('aa:bb:cc:00:01:02', '172.16.1.2', 41,
                          src_port=port_num, dst_port=port_num)
    assert_that(inside, should_NOT_receive('dst host 172.16.1.2 and udp',
                                           within_sec(5)),
                'Packets are filtered based on mac address.')
    wait_on_futures([f1])
开发者ID:MatheMatrix,项目名称:midonet,代码行数:33,代码来源:test_conn_tracking.py


示例5: test_filtering_by_network_address

def test_filtering_by_network_address():
    '''
    Title: Tests packets filtering based on network address

    Scenario:
    When: A VM sends UDP packets to another host on the same bridge.
    Then: The UDP packets reaches the receiver. 
    Then: Filtering rule chains based on network address (IP address) are set on
          the bridge port that the receiver host is connected to.
    And: The UDP packets from the same sender do NOT reach the receiver.
    '''
    sender = BM.get_iface_for_port('bridge-000-001', 2)
    receiver = BM.get_iface_for_port('bridge-000-001', 3)

    # Reset in/out-bound filters. 
    unset_bridge_port_filters('bridge-000-001', 3)

    port_num = get_random_port_num()
    f1 = sender.send_udp('aa:bb:cc:00:01:02', '172.16.1.2', 41,
                         src_port=port_num, dst_port=port_num)
    assert_that(receiver, receives('dst host 172.16.1.2 and udp', within_sec(5)),
                'No filtering: receiver receives UDP packets from sender.')
    wait_on_futures([f1])

    # Set a filtering rule based on network address.
    set_bridge_port_filters('bridge-000-001', 3, 'connection_tracking_nw_in',
                            'connection_tracking_nw_out')

    f1 = sender.send_udp('aa:bb:cc:00:01:02', '172.16.1.2', 41,
                         src_port=port_num, dst_port=port_num)
    assert_that(receiver, should_NOT_receive('dst host 172.16.1.2 and udp',
                                             within_sec(5)),
                'Packets are filtered based on IP address.')
    wait_on_futures([f1])
开发者ID:MatheMatrix,项目名称:midonet,代码行数:34,代码来源:test_conn_tracking.py


示例6: test_dnat

def test_dnat():
    """
    Title: Tests DNAT on ping messages.

    Scenario 1:
    When: a VM sends ICMP echo request with ping command to an unassigned IP
          address.
    Then: the router performs DNAT on the message according to the rule chain
          set to the router,
    And: the receiver VM should receive the ICMP echo packet,
    And: the ping command succeeds
    """
    sender = BM.get_iface_for_port('bridge-000-001', 2)
    receiver = BM.get_iface_for_port('bridge-000-002', 2)

    # Reset in-/out-bound filters.
    unset_filters('router-000-001')
    feed_receiver_mac(receiver)

    f2 = async_assert_that(receiver, should_NOT_receive('dst host 172.16.2.1 and icmp',
                                             within_sec(5)))
    f1 = sender.ping_ipv4_addr('100.100.100.100')
    wait_on_futures([f1, f2])

    # Set DNAT rule chains to the router
    set_filters('router-000-001', 'pre_filter_001', 'post_filter_001')

    f2 = async_assert_that(receiver, receives('dst host 172.16.2.1 and icmp',
                                   within_sec(5)))
    f3 = async_assert_that(sender, receives('src host 100.100.100.100 and icmp',
                                 within_sec(5)))
    f1 = sender.ping_ipv4_addr('100.100.100.100')
    wait_on_futures([f1, f2, f3])
开发者ID:yamt,项目名称:midonet,代码行数:33,代码来源:test_nat_router.py


示例7: test_filtering_by_dl

def test_filtering_by_dl():
    """
    Title: Tests dl-based packet filtering.

    Scenario:
    When: A VM sends UDP packets to another host on the same bridge.
    Then: The UDP packets reach the receiver without filtering rule chains.
    Then: A filtering rule chain based on mac address is set on the bridge.
    And: UDP packets from the same host do NOT reach the same destination host.
    """
    outside = BM.get_iface_for_port("bridge-000-001", 2)
    inside = BM.get_iface_for_port("bridge-000-001", 3)

    # Reset an in-bound filter.
    unset_bridge_port_filters("bridge-000-001", 3)

    port_num = get_random_port_num()
    f1 = async_assert_that(
        inside,
        receives("dst host 172.16.1.2 and udp", within_sec(5)),
        "No filtering: inside receives UDP packets from outside.",
    )
    f2 = outside.send_udp("aa:bb:cc:00:01:02", "172.16.1.2", 41, src_port=port_num, dst_port=port_num)
    wait_on_futures([f1, f2])

    # Set a filtering rule based on mac addresses
    set_bridge_port_filters("bridge-000-001", 3, "connection_tracking_dl_in", "connection_tracking_dl_out")

    f1 = async_assert_that(
        inside,
        should_NOT_receive("dst host 172.16.1.2 and udp", within_sec(5)),
        "Packets are filtered based on mac address.",
    )
    f2 = outside.send_udp("aa:bb:cc:00:01:02", "172.16.1.2", 41, src_port=port_num, dst_port=port_num)
    wait_on_futures([f1, f2])
开发者ID:itsuugo,项目名称:midonet,代码行数:35,代码来源:test_conn_tracking.py


示例8: test_snat

def test_snat():
    """
    Title: Tests SNAT on ping messages.

    Scenario:
    When: a VM sends ICMP echo request with ping command to a different subnet,
    Then: the router performs SNAT on the message according to the rule chain
          set to the router,
    And: the receiver VM should receive the ICMP echo packet, with src address
         NATted,
    And: the ping command succeeds.
    """
    sender = BM.get_iface_for_port('bridge-000-001', 2)
    receiver = BM.get_iface_for_port('bridge-000-002', 2)

    # Reset in-/out-bound filters.
    unset_filters('router-000-001')
    feed_receiver_mac(receiver)

    # No SNAT configured. Should not receive SNATed messages.
    f2 = async_assert_that(receiver, should_NOT_receive('src host 172.16.1.100 and icmp',
                                             within_sec(5)))
    f1 = sender.ping4(receiver)
    wait_on_futures([f1, f2])

    # Set SNAT rule chains to the router
    set_filters('router-000-001', 'pre_filter_002', 'post_filter_002')

    # The receiver should receive SNATed messages.
    f2 = async_assert_that(receiver, receives('src host 172.16.1.100 and icmp',
                                   within_sec(5)))
    f3 = async_assert_that(sender, receives('dst host 172.16.1.1 and icmp',
                                 within_sec(5)))
    f1 = sender.ping4(receiver)
    wait_on_futures([f1, f2, f3])
开发者ID:yamt,项目名称:midonet,代码行数:35,代码来源:test_nat_router.py


示例9: test_flow_invalidation_on_mac_update

def test_flow_invalidation_on_mac_update():
    """
    Title: Flow invalidation, learning MACs

    The bridge learns the MACs from the traffic flowing by its ports.
    When the bridge learns a MAC that has 'moved' to another port, it should
    send traffic only to that port.
    """

    sender = BM.get_iface_for_port('bridge-000-001', 1)
    receiver = BM.get_iface_for_port('bridge-000-001', 2)
    intruder = BM.get_iface_for_port('bridge-000-001', 3)

    receiver_MAC = receiver.get_mac_addr()
    frame = '%s-%s-aa:bb' % (receiver_MAC, receiver_MAC)

    capture = 'icmp and src host %s' % (sender.get_ip())

    # Populate ARP table
    sender.execute('arp -s %s %s' % (receiver.get_ip(), receiver_MAC))
    receiver.execute('arp -s %s %s' % (sender.get_ip(), sender.get_mac_addr()))

    # Trigger receiver MAC learning
    receiver.send_ether(frame)

    # First: packets go from sender to receiver
    f1 = async_assert_that(receiver, receives(capture, within_sec(5)))
    f2 = async_assert_that(intruder, should_NOT_receive(capture, within_sec(5)))
    f3 = sender.ping4(receiver)
    wait_on_futures([f1, f2, f3])

    # Second: intruder claims to be receiver
    intruder.send_ether(frame)

    # Third: packets go from sender to intruder
    f1 = async_assert_that(receiver, should_NOT_receive(capture, within_sec(5)))
    f2 = async_assert_that(intruder, receives(capture, within_sec(5)))
    f3 = sender.ping4(receiver)
    wait_on_futures([f1, f2, f3])
开发者ID:danielmellado,项目名称:midonet,代码行数:39,代码来源:test_bridge.py


示例10: run_garp_scenario

def run_garp_scenario(BM, sender_port, target_ip, enable_vip, disable_vip):
    vip1 = BM.get_interface_on_vport('port_int1')
    vip2 = BM.get_interface_on_vport('port_int2')

    sender = BM.get_interface_on_vport(sender_port)
    # allow sender to accept gratutious arps (only makes sense if on same network)
    sender.execute('bash -c "echo 1 > /proc/sys/net/ipv4/conf/%s/arp_accept"'
                   % sender.get_ifname())
    rcv_filter = 'icmp and ip src %s' % (sender.get_ip())

    # noone responds initially
    f1 = async_assert_that(vip1, should_NOT_receive(rcv_filter, within_sec(10)))
    f2 = async_assert_that(vip2, should_NOT_receive(rcv_filter, within_sec(10)))
    f3 = sender.ping_ipv4_addr(target_ip, count=5)
    wait_on_futures([f1, f2, f3])

    # enable for vip1
    enable_vip(vip1)
    disable_vip(vip2)
    f1 = async_assert_that(vip1, receives(rcv_filter, within_sec(10)))
    f2 = async_assert_that(vip2, should_NOT_receive(rcv_filter, within_sec(10)))
    f3 = sender.ping_ipv4_addr(target_ip, count=5)
    wait_on_futures([f1, f2, f3])

    # enable for vip2
    enable_vip(vip2)
    disable_vip(vip1)
    f1 = async_assert_that(vip1, should_NOT_receive(rcv_filter, within_sec(10)))
    f2 = async_assert_that(vip2, receives(rcv_filter, within_sec(10)))
    f3 = sender.ping_ipv4_addr(target_ip, count=5)
    wait_on_futures([f1, f2, f3])

    # enable for vip1
    enable_vip(vip1)
    disable_vip(vip2)
    f1 = async_assert_that(vip1, receives(rcv_filter, within_sec(10)))
    f2 = async_assert_that(vip2, should_NOT_receive(rcv_filter, within_sec(10)))
    f3 = sender.ping_ipv4_addr(target_ip, count=5)
    wait_on_futures([f1, f2, f3])
开发者ID:danielmellado,项目名称:midonet,代码行数:39,代码来源:test_garp.py


示例11: test_mac_learning

def test_mac_learning():
    """
    Title: Bridge mac learning

    Scenario 1:
    When: the destination ethernet address has never been seen before.
    Then: the bridge should flood the ethernet unicast

    Scenario 2:
    When: the destination ethernet address has been seen before.
    Then: the bridge should not flood the ethernet frame, instaed it should
          forward to only the port that is connected to the interface with
          the mac address.
    """
    sender = BM.get_iface_for_port('bridge-000-001', 1)
    iface_with_the_hw_addr = BM.get_iface_for_port('bridge-000-001', 2)
    iface_x = BM.get_iface_for_port('bridge-000-001', 3)

    hw_addr = iface_with_the_hw_addr.get_mac_addr()
    match_on_the_hw_addr = 'ether dst ' + hw_addr

    ethernet_unicast_to_the_hw_addr = '%s-7e:1f:ff:ff:ff:ff-aa:bb' % (hw_addr)

    # Scenario 1:
    # Both interfaces should get the frname as the bridge should flood it.

    f1 = async_assert_that(iface_with_the_hw_addr,
                           receives(match_on_the_hw_addr, within_sec(5)))
    f2 = async_assert_that(iface_x,
                           receives(match_on_the_hw_addr, within_sec(5)))
    time.sleep(1)

    sender.send_ether(ethernet_unicast_to_the_hw_addr, count=3)
    wait_on_futures([f1, f2])

    # Scenario 2:

    # Get the bridge to learn the mac address
    iface_with_the_hw_addr.ping4(sender, sync=True)

    time.sleep(1)

    # only iface_with_the_hw_addr should receives the ehternet unicast
    f1 = async_assert_that(iface_with_the_hw_addr,
                           receives(match_on_the_hw_addr, within_sec(5)))
    f2 = async_assert_that(iface_x,
                           should_NOT_receive(match_on_the_hw_addr,
                                              within_sec(5)))
    sender.send_ether(ethernet_unicast_to_the_hw_addr, count=1)
    wait_on_futures([f1, f2])
开发者ID:danielmellado,项目名称:midonet,代码行数:50,代码来源:test_bridge.py


示例12: test_src_mac_masking

def test_src_mac_masking():
    """
    Title: Test source MAC masking in chain rules

    Scenario 1:
    When: There's a rule dropping any traffic with an even source MAC
    Then: Traffic from if2 to if1 is blocked because if2's MAC ends with 2
    And:  Traffic from if1 to if2 goes through because if1's MAC ends with 1

    FIXME: moving to the new bindings mechanisms should allow removing
    this restriction.
    Only running this with the one-host binding, because:
    1. The multi-host binding breaks the assumptions that if1 will have
       an odd MAC address and if2 an even one.
    2. This is basically just a sanity test to make sure dl_src_mask is
       wired up. Unit tests and test_dst_mac_masking provide enough
       coverage of the other aspects.
    3. These tests are slow enough as it is.
    """

    bridge = VTM.get_bridge('bridge-000-001')

    if1 = BM.get_iface_for_port('bridge-000-001', 1)
    if2 = BM.get_iface_for_port('bridge-000-001', 2)

    if1_hw_addr = if1.interface['hw_addr']
    if2_hw_addr = if2.interface['hw_addr']

    if1_ip_addr = if1.get_ip()
    if2_ip_addr = if2.get_ip()

    if1_rcv_filter = 'udp and ether dst %s' % if1_hw_addr
    if2_rcv_filter = 'udp and ether dst %s' % if2_hw_addr

    bridge.set_inbound_filter(VTM.get_chain('drop_even_src_mac'))

    # If2 has an even MAC (ends with 2), so traffic from if2 to if1
    # should be dropped.
    f1 = async_assert_that(if1, should_NOT_receive(if1_rcv_filter, within_sec(5)))
    time.sleep(1)
    f2 = if2.send_udp(if1_hw_addr, if1_ip_addr, 41)
    wait_on_futures([f1, f2])

    # If1 has an odd MAC (ends with 1), so traffic from if1 to if2
    # should go through.
    f1 = async_assert_that(if2, receives(if2_rcv_filter, within_sec(5)))
    time.sleep(1)
    f2 = if1.send_udp(if2_hw_addr, if2_ip_addr, 41)
    wait_on_futures([f1, f2])
开发者ID:yamt,项目名称:midonet,代码行数:49,代码来源:test_chains.py


示例13: test_two_isolated_bridges

def test_two_isolated_bridges():
    """
    Title: Two isolated bridges

    All traffic between two VMs in different and
    unconnected bridges should be independent, so
    receiver shouldn't get any packets
    """

    sender = BM.get_iface_for_port('bridge-000-001', 1)
    receiver = BM.get_iface_for_port('bridge-000-002', 1)

    f2 = async_assert_that(receiver,
                           should_NOT_receive('', within_sec(5)))

    f1 = sender.ping4(receiver, 0.5, 3, False, 100)

    wait_on_futures([f1, f2])
开发者ID:danielmellado,项目名称:midonet,代码行数:18,代码来源:test_bridge.py


示例14: test_snat_for_udp

def test_snat_for_udp():
    """
    Title: Tests SNAT on UDP packets.

    Scenario:
    When: a VM sends UDP packets to an unassigned IP address.
    Then: the router performs SNAT on the message according to the rule chain
          set to the router,
    And: the UDP packets reach the receiver VM, with src address NATted,
    And: because the UDP port is not open, the receiver VM returns ICMP error
         responses.
    """
    sender = BM.get_iface_for_port('bridge-000-001', 2)
    receiver = BM.get_iface_for_port('bridge-000-002', 2)

    # Reset in-/out-bound filters.
    unset_filters('router-000-001')
    feed_receiver_mac(receiver)

    # Target hardware is a router's incoming port.
    router_port = VTM.get_router('router-000-001').get_port(1)
    router_mac = router_port.get_mn_resource().get_port_mac()

    # No SNAT configured. Should not receive SNATed messages.
    f2 = async_assert_that(receiver, should_NOT_receive('src host 172.16.1.100 and udp',
                                             within_sec(5)))
    f1 = sender.send_udp(router_mac, '172.16.2.1', 29,
                         src_port=9, dst_port=65000)
    wait_on_futures([f1, f2])

    # Set SNAT rule chains to the router
    set_filters('router-000-001', 'pre_filter_002', 'post_filter_002')

    # The receiver should receive SNATed messages.
    f2 = async_assert_that(receiver, receives('src host 172.16.1.100 and udp',
                                   within_sec(5)))
    # Sender should receive ICMP unreachable as the receiver port is not open.
    f3 = async_assert_that(sender, receives_icmp_unreachable_for_udp(
                                '172.16.1.1', '172.16.2.1',
                                udp_src_port=9, udp_dst_port=65000,
                                timeout=within_sec(5)))
    f1 = sender.send_udp(router_mac, '172.16.2.1', 29,
                         src_port=9, dst_port=65000)
    wait_on_futures([f1, f2, f3])
开发者ID:yamt,项目名称:midonet,代码行数:44,代码来源:test_nat_router.py


示例15: test_icmp_after_interface_recovery

def test_icmp_after_interface_recovery():
    """
    Title: ICMP reachability over bridge before and after interfaces go
    down

    Scenario 1:
    When: a VM sends ICMP echo request with ping command
    Then: the receiver VM should receive the ICMP echo packet.
    And: the ping command succeeds
    Then: the receiver VM's tap goes down
    And: the ping command fails
    Then: the receiver VM's tap goes back up
    And: the ping command succeeds
    """

    sender = BM.get_iface_for_port('bridge-000-001', 1)
    receiver = BM.get_iface_for_port('bridge-000-001', 3)

    f1 = async_assert_that(receiver,
                           receives('dst host 172.16.1.3 and icmp',
                                    within_sec(5)))
    f2 = sender.ping4(receiver)
    wait_on_futures([f1, f2])

    receiver.set_down()

    f1 = async_assert_that(receiver,
                           should_NOT_receive('icmp',
                                              within_sec(5),
                                              on_host_interface(True)))
    f2 = sender.ping4(receiver)

    wait_on_futures([f1, f2])

    receiver.set_up()

    f1 = async_assert_that(receiver,
                           receives('dst host 172.16.1.3 and icmp',
                                    within_sec(5)))
    f2 = sender.ping4(receiver)
    wait_on_futures([f1, f2])
开发者ID:danielmellado,项目名称:midonet,代码行数:41,代码来源:test_bridge.py


示例16: test_rule_changes

def test_rule_changes():
    """
    Title: ICMP reachability over bridge before and after adding rule
    to drop IPv4 traffic.

    Scenario 1:
    When: A VM sends ICMP echo request with ping command
    Then: The receiver VM should receive the ICMP echo packet.
    And: The ping command succeeds
    Then: The receiver adds a rule blocking IPv4 traffic.
    And: The ping command fails
    Then: the receiver removes the rule
    And: The ping succeeds again.
    """

    sender = BM.get_iface_for_port('bridge-000-001', 1)
    receiver = BM.get_iface_for_port('bridge-000-001', 2)

    # There are no filters, so the first ping should succeed.
    f1 = async_assert_that(receiver,
                           receives('icmp', within_sec(5)))
    f2 = sender.ping4(receiver, do_arp=True)
    wait_on_futures([f1, f2])

    # Add a filter dropping all IPv4 traffic to port 2.
    chain = VTM.get_chain('drop_ipv4')
    VTM.get_device_port('bridge-000-001', 2).set_outbound_filter(chain)

    # The second ping should not reach port 2.
    f1 = async_assert_that(receiver,
                           should_NOT_receive('icmp', within_sec(5)))
    f2 = sender.ping4(receiver, do_arp=True)
    wait_on_futures([f1, f2])

    # After removing the filter, ping should succeed again.
    VTM.get_device_port('bridge-000-001', 2).set_outbound_filter(None)
    f1 = async_assert_that(receiver,
                           receives('icmp', within_sec(5)))
    f2 = sender.ping4(receiver, do_arp=True)
    wait_on_futures([f1, f2])
开发者ID:danielmellado,项目名称:midonet,代码行数:40,代码来源:test_bridge.py


示例17: test_dnat_for_udp

def test_dnat_for_udp():
    """
    Title: Tests DNAT on UDP packets.

    Scenario:
    When: a VM sends UDP packets to an unassigned IP address.
    Then: the router performs DNAT on the message according to the rule chain
          set to the router,
    And: the UDP packets reach the receiver VM.
    And: because the UDP port is not open, the receiver VM returns ICMP error
         responses.
    """
    sender = BM.get_iface_for_port("bridge-000-001", 2)
    receiver = BM.get_iface_for_port("bridge-000-002", 2)

    # Reset in-/out-bound filters.
    unset_filters("router-000-001")
    feed_receiver_mac(receiver)

    # Target hardware is a router's incoming port.
    router_port = VTM.get_router("router-000-001").get_port(1)
    router_mac = router_port.get_mn_resource().get_port_mac()

    f2 = async_assert_that(receiver, should_NOT_receive("dst host 172.16.2.1 and udp", within_sec(5)))
    f1 = sender.send_udp(router_mac, "100.100.100.100", 29, src_port=9, dst_port=9)
    wait_on_futures([f1, f2])

    # Set DNAT rule chains to the router
    set_filters("router-000-001", "pre_filter_001", "post_filter_001")

    f2 = async_assert_that(receiver, receives("dst host 172.16.2.1 and udp", within_sec(5)))
    # Sender should receive ICMP unreachable as the receiver port is not open.
    f3 = async_assert_that(
        sender,
        receives_icmp_unreachable_for_udp(
            "172.16.1.1", "100.100.100.100", udp_src_port=9, udp_dst_port=9, timeout=within_sec(5)
        ),
    )
    f1 = sender.send_udp(router_mac, "100.100.100.100", 29, src_port=9, dst_port=9)
    wait_on_futures([f1, f2, f3])
开发者ID:aglne,项目名称:midonet,代码行数:40,代码来源:test_nat_router.py


示例18: test_filtering_by_network_address

def test_filtering_by_network_address():
    """
    Title: Tests packets filtering based on network address

    Scenario:
    When: A VM sends UDP packets to another host on the same bridge.
    Then: The UDP packets reaches the receiver. 
    Then: Filtering rule chains based on network address (IP address) are set on
          the bridge port that the receiver host is connected to.
    And: The UDP packets from the same sender do NOT reach the receiver.
    """
    sender = BM.get_iface_for_port("bridge-000-001", 2)
    receiver = BM.get_iface_for_port("bridge-000-001", 3)

    # Reset in/out-bound filters.
    unset_bridge_port_filters("bridge-000-001", 3)

    port_num = get_random_port_num()
    # FIXME: do not use harcoded values!
    f1 = async_assert_that(
        receiver,
        receives("dst host 172.16.1.2 and udp", within_sec(5)),
        "No filtering: receives UDP packets from sender.",
    )
    f2 = sender.send_udp("aa:bb:cc:00:01:02", "172.16.1.2", 41, src_port=port_num, dst_port=port_num)
    wait_on_futures([f1, f2])

    # Set a filtering rule based on network address.
    set_bridge_port_filters("bridge-000-001", 3, "connection_tracking_nw_in", "connection_tracking_nw_out")

    f1 = async_assert_that(
        receiver,
        should_NOT_receive("dst host 172.16.1.2 and udp", within_sec(5)),
        "Packets are filtered based on IP address.",
    )
    f2 = sender.send_udp("aa:bb:cc:00:01:02", "172.16.1.2", 41, src_port=port_num, dst_port=port_num)
    wait_on_futures([f1, f2])
开发者ID:itsuugo,项目名称:midonet,代码行数:37,代码来源:test_conn_tracking.py


示例19: expect_return_dropped

def expect_return_dropped(dst_port_no):
    return async_assert_that(
        downlink_iface(),
        should_NOT_receive(return_filter(dst_port_no), within_sec(10)),
        'Return flow gets dropped.')
开发者ID:yamt,项目名称:midonet,代码行数:5,代码来源:test_l4state.py


示例20: test_spoofed_arp_reply

def test_spoofed_arp_reply():
    """
    Title: Test spoofed ARP reply

    Given: Sender is bridge-000-001, port 2, at 176.16.1.1
           Receiver is bridge-000-002, port 2, at 176.16.2.1

    Scenario 1:
    When: Sender pings 176.16.2.2
    Then: The ping fails.

    Scenario 2:
    When: Receiver sends an unsolicited ARP reply to its gateway (172.16.2.254)
          with source IP 176.16.2.2 and its own MAC address as source MAC.
    Then: The router maps 176.16.2.2 to receiver's MAC address.
    And:  Sender pings 176.16.2.2
    Then: The ping succeeds.

    Scenario 3:
    When: Receiver sends an unsolicited ARP reply to its gateway (172.16.2.254)
          with source IP 176.16.1.2 and its own MAC address as source MAC.
    Then: The router ignores it because 176.16.1.2 is not in the subnet
          (172.16.2.254/24) of the port through which the ARP reply came in.
    And:  Sender pings 176.16.1.2
    Then: The ping fails.
    """
    sender = BM.get_iface_for_port('bridge-000-001', 2)
    receiver = BM.get_iface_for_port('bridge-000-002', 2)

    # 176.16.2.2 is not in the router's ARP table. Ping fails.
    f1 = async_assert_that(receiver,
                           should_NOT_receive('dst host 172.16.2.2 and icmp',
                                              within_sec(5)))
    sender.ping_ipv4_addr('172.16.2.2')
    wait_on_futures([f1])

    # Sender sends an unsolicited ARP reply with source IP 172.16.2.2,
    # which the router maps to sender's MAC address.
    router_port = VTM.get_router('router-000-001').get_port(2)
    router_mac = router_port.get_mn_resource().get_port_mac()

    receiver.send_arp_reply(receiver.get_mac_addr(), router_mac,
                            '172.16.2.2', '172.16.2.254')

    # wait for the arp reply effect to be propagated
    time.sleep(20)

    # Ping now succeeds.
    f1 = async_assert_that(receiver,
                           receives('dst host 172.16.2.2 and icmp',
                                    within_sec(5)))
    sender.ping_ipv4_addr('172.16.2.2')
    wait_on_futures([f1])

    # This ARP reply is ignored because 172.16.1.2 is not in the subnet of
    # router port 2, so a ping to 172.16.1.2 is ignored.
    f1 = async_assert_that(receiver,
                           should_NOT_receive('dst host 172.16.1.2 and icmp',
                                              within_sec(5)))
    receiver.send_arp_reply(receiver.get_mac_addr(), router_mac,
                            '172.16.1.2', '172.16.2.254')
    sender.ping_ipv4_addr('172.16.1.2')
    wait_on_futures([f1])
开发者ID:danielmellado,项目名称:midonet,代码行数:63,代码来源:test_router.py



注:本文中的mdts.tests.utils.asserts.should_NOT_receive函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python asserts.within_sec函数代码示例发布时间:2022-05-27
下一篇:
Python asserts.receives函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap