• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python methods.get_method函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sshuttle.methods.get_method函数的典型用法代码示例。如果您正苦于以下问题:Python get_method函数的具体用法?Python get_method怎么用?Python get_method使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_method函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_tcp_dstip

def test_get_tcp_dstip():
    sock = Mock()
    sock.getsockopt.return_value = struct.pack(
        '!HHBBBB', socket.ntohs(AF_INET), 1024, 127, 0, 0, 1)
    method = get_method('nat')
    assert method.get_tcp_dstip(sock) == ('127.0.0.1', 1024)
    assert sock.mock_calls == [call.getsockopt(0, 80, 16)]
开发者ID:luserx0,项目名称:sshuttle,代码行数:7,代码来源:test_methods_nat.py


示例2: test_setup_firewall_openbsd

def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    mock_pfctl.side_effect = pfctl

    method = get_method("pf")
    assert method.name == "pf"

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1024,
            1026,
            [(10, u"2404:6800:4004:80c::33")],
            10,
            [(10, 64, False, u"2404:6800:4004:80c::"), (10, 128, True, u"2404:6800:4004:80c::101f")],
            True,
        )
    assert str(excinfo.value) == 'Address family "AF_INET6" unsupported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027, [(2, u"1.2.3.33")], 2, [(2, 24, False, u"1.2.3.0"), (2, 32, True, u"1.2.3.66")], True
        )
    assert str(excinfo.value) == "UDP not supported by pf method_name"
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027, [(2, u"1.2.3.33")], 2, [(2, 24, False, u"1.2.3.0"), (2, 32, True, u"1.2.3.66")], False
    )
    assert mock_ioctl.mock_calls == [call(mock_pf_get_dev(), 0xCD48441A, ANY), call(mock_pf_get_dev(), 0xCD48441A, ANY)]
    assert mock_pfctl.mock_calls == [
        call("-f /dev/stdin", b"match on lo\n"),
        call("-s all"),
        call(
            "-a sshuttle -f /dev/stdin",
            b"table <forward_subnets> {!1.2.3.66/32,1.2.3.0/24}\n"
            b"table <dns_servers> {1.2.3.33}\n"
            b"pass in on lo0 inet proto tcp divert-to 127.0.0.1 port 1025\n"
            b"pass in on lo0 inet proto udp to "
            b"<dns_servers>port 53 rdr-to 127.0.0.1 port 1027\n"
            b"pass out inet proto tcp to "
            b"<forward_subnets> route-to lo0 keep state\n"
            b"pass out inet proto udp to "
            b"<dns_servers> port 53 route-to lo0 keep state\n",
        ),
        call("-e"),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, 2, False)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [call("-a sshuttle -F all"), call("-d")]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()
开发者ID:vieira,项目名称:sshuttle,代码行数:60,代码来源:test_methods_pf.py


示例3: test_recv_udp

def test_recv_udp():
    sock = Mock()
    sock.recvfrom.return_value = "11111", "127.0.0.1"
    method = get_method("nat")
    result = method.recv_udp(sock, 1024)
    assert sock.mock_calls == [call.recvfrom(1024)]
    assert result == ("127.0.0.1", None, "11111")
开发者ID:Kriechi,项目名称:sshuttle,代码行数:7,代码来源:test_methods_nat.py


示例4: test_setup_tcp_listener

def test_setup_tcp_listener():
    listener = Mock()
    method = get_method('tproxy')
    method.setup_tcp_listener(listener)
    assert listener.mock_calls == [
        call.setsockopt(0, 19, 1)
    ]
开发者ID:tberton,项目名称:sshuttle,代码行数:7,代码来源:test_methods_tproxy.py


示例5: test_recv_udp

def test_recv_udp(mock_recv_udp):
    mock_recv_udp.return_value = ("127.0.0.1", "127.0.0.2", "11111")

    sock = Mock()
    method = get_method('tproxy')
    result = method.recv_udp(sock, 1024)
    assert sock.mock_calls == []
    assert mock_recv_udp.mock_calls == [call(sock, 1024)]
    assert result == ("127.0.0.1", "127.0.0.2", "11111")
开发者ID:tberton,项目名称:sshuttle,代码行数:9,代码来源:test_methods_tproxy.py


示例6: test_setup_udp_listener

def test_setup_udp_listener():
    listener = Mock()
    method = get_method('tproxy')
    method.setup_udp_listener(listener)
    assert listener.mock_calls == [
        call.setsockopt(0, 19, 1),
        call.v4.setsockopt(0, 20, 1),
        call.v6.setsockopt(41, 74, 1)
    ]
开发者ID:tberton,项目名称:sshuttle,代码行数:9,代码来源:test_methods_tproxy.py


示例7: test_setup_firewall

def test_setup_firewall(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    method = get_method('pf')
    assert method.name == 'pf'

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1024, 1026,
            [(10, u'2404:6800:4004:80c::33')],
            10,
            [(10, 64, False, u'2404:6800:4004:80c::'),
                (10, 128, True, u'2404:6800:4004:80c::101f')],
            True)
    assert str(excinfo.value) \
        == 'Address family "AF_INET6" unsupported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(2, u'1.2.3.33')],
            2,
            [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')],
            True)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(2, u'1.2.3.33')],
        2,
        [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')],
        False)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 3295691827, ANY),
        call(mock_pf_get_dev(), 3424666650, ANY),
        call(mock_pf_get_dev(), 3424666650, ANY),
        call(mock_pf_get_dev(), 3295691827, ANY),
        call(mock_pf_get_dev(), 3424666650, ANY),
        call(mock_pf_get_dev(), 3424666650, ANY),
    ]
    # FIXME - needs more work
    # print(mock_pfctl.mock_calls)
    # assert mock_pfctl.mock_calls == []
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.setup_firewall(1025, 0, [], 2, [], False)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [call('-a sshuttle -F all')]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()
开发者ID:tberton,项目名称:sshuttle,代码行数:57,代码来源:test_methods_pf.py


示例8: __init__

    def __init__(self, method_name):
        self.auto_nets = []
        python_path = os.path.dirname(os.path.dirname(__file__))
        argvbase = ([sys.executable, sys.argv[0]] +
                    ['-v'] * (helpers.verbose or 0) +
                    ['--method', method_name] +
                    ['--firewall'])
        if ssyslog._p:
            argvbase += ['--syslog']
        argv_tries = [
            ['sudo', '-p', '[local sudo] Password: ',
                ('PYTHONPATH=%s' % python_path), '--'] + argvbase,
            argvbase
        ]

        # we can't use stdin/stdout=subprocess.PIPE here, as we normally would,
        # because stupid Linux 'su' requires that stdin be attached to a tty.
        # Instead, attach a *bidirectional* socket to its stdout, and use
        # that for talking in both directions.
        (s1, s2) = socket.socketpair()

        def setup():
            # run in the child process
            s2.close()
        e = None
        if os.getuid() == 0:
            argv_tries = argv_tries[-1:]  # last entry only
        for argv in argv_tries:
            try:
                if argv[0] == 'su':
                    sys.stderr.write('[local su] ')
                self.p = ssubprocess.Popen(argv, stdout=s1, preexec_fn=setup)
                e = None
                break
            except OSError as e:
                pass
        self.argv = argv
        s1.close()
        if sys.version_info < (3, 0):
            # python 2.7
            self.pfile = s2.makefile('wb+')
        else:
            # python 3.5
            self.pfile = s2.makefile('rwb')
        if e:
            log('Spawning firewall manager: %r\n' % self.argv)
            raise Fatal(e)
        line = self.pfile.readline()
        self.check()
        if line[0:5] != b'READY':
            raise Fatal('%r expected READY, got %r' % (self.argv, line))
        method_name = line[6:-1]
        self.method = get_method(method_name.decode("ASCII"))
        self.method.set_firewall(self)
开发者ID:dlenski,项目名称:sshuttle,代码行数:54,代码来源:client.py


示例9: test_assert_features

def test_assert_features():
    method = get_method("nat")
    features = method.get_supported_features()
    method.assert_features(features)

    features.udp = True
    with pytest.raises(Fatal):
        method.assert_features(features)

    features.ipv6 = True
    with pytest.raises(Fatal):
        method.assert_features(features)
开发者ID:Kriechi,项目名称:sshuttle,代码行数:12,代码来源:test_methods_nat.py


示例10: test_send_udp

def test_send_udp(mock_socket):
    sock = Mock()
    method = get_method('tproxy')
    method.send_udp(sock, "127.0.0.2", "127.0.0.1", "2222222")
    assert sock.mock_calls == []
    assert mock_socket.mock_calls == [
        call(sock.family, 2),
        call().setsockopt(1, 2, 1),
        call().setsockopt(0, 19, 1),
        call().bind('127.0.0.2'),
        call().sendto("2222222", '127.0.0.1'),
        call().close()
    ]
开发者ID:tberton,项目名称:sshuttle,代码行数:13,代码来源:test_methods_tproxy.py


示例11: test_get_tcp_dstip

def test_get_tcp_dstip():
    sock = Mock()
    sock.getpeername.return_value = ("127.0.0.1", 1024)
    sock.getsockname.return_value = ("127.0.0.2", 1025)
    sock.family = socket.AF_INET

    firewall = Mock()
    firewall.pfile.readline.return_value = b"QUERY_PF_NAT_SUCCESS 127.0.0.3,1026\n"

    method = get_method("pf")
    method.set_firewall(firewall)
    assert method.get_tcp_dstip(sock) == ("127.0.0.3", 1026)

    assert sock.mock_calls == [call.getpeername(), call.getsockname()]
    assert firewall.mock_calls == [
        call.pfile.write(b"QUERY_PF_NAT 2,6,127.0.0.1,1024,127.0.0.2,1025\n"),
        call.pfile.flush(),
        call.pfile.readline(),
    ]
开发者ID:vieira,项目名称:sshuttle,代码行数:19,代码来源:test_methods_pf.py


示例12: test_setup_firewall

def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt):
    mock_ipt_chain_exists.return_value = True
    method = get_method('nat')
    assert method.name == 'nat'

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1024, 1026,
            [(AF_INET6, u'2404:6800:4004:80c::33')],
            AF_INET6,
            [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
                (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
            True,
            None)
    assert str(excinfo.value) \
        == 'Address family "AF_INET6" unsupported by nat method_name'
    assert mock_ipt_chain_exists.mock_calls == []
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == []

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(AF_INET, u'1.2.3.33')],
            AF_INET,
            [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
                (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
            True,
            None)
    assert str(excinfo.value) == 'UDP not supported by nat method_name'
    assert mock_ipt_chain_exists.mock_calls == []
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(AF_INET, u'1.2.3.33')],
        AF_INET,
        [(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
            (AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
        False,
        None)
    assert mock_ipt_chain_exists.mock_calls == [
        call(AF_INET, 'nat', 'sshuttle-1025')
    ]
    assert mock_ipt_ttl.mock_calls == [
        call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
             '--dest', u'1.2.3.0/24', '-p', 'tcp', '--dport', '8000:9000',
             '--to-ports', '1025'),
        call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
             '--dest', u'1.2.3.33/32', '-p', 'udp',
             '--dport', '53', '--to-ports', '1027')
    ]
    assert mock_ipt.mock_calls == [
        call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-X', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-N', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
             '--dest', u'1.2.3.66/32', '-p', 'tcp', '--dport', '8080:8080')
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()

    method.restore_firewall(1025, AF_INET, False, None)
    assert mock_ipt_chain_exists.mock_calls == [
        call(AF_INET, 'nat', 'sshuttle-1025')
    ]
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == [
        call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
        call(AF_INET, 'nat', '-X', 'sshuttle-1025')
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()
开发者ID:luserx0,项目名称:sshuttle,代码行数:83,代码来源:test_methods_nat.py


示例13: test_get_tcp_dstip

def test_get_tcp_dstip():
    sock = Mock()
    sock.getsockopt.return_value = struct.pack("!HHBBBB", socket.ntohs(socket.AF_INET), 1024, 127, 0, 0, 1)
    method = get_method("nat")
    assert method.get_tcp_dstip(sock) == ("127.0.0.1", 1024)
    assert sock.mock_calls == [call.getsockopt(0, 80, 16)]
开发者ID:Kriechi,项目名称:sshuttle,代码行数:6,代码来源:test_methods_nat.py


示例14: test_get_supported_features

def test_get_supported_features():
    method = get_method('tproxy')
    features = method.get_supported_features()
    assert features.ipv6
    assert features.udp
开发者ID:tberton,项目名称:sshuttle,代码行数:5,代码来源:test_methods_tproxy.py


示例15: test_setup_firewall

def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt):
    mock_ipt_chain_exists.return_value = True
    method = get_method("nat")
    assert method.name == "nat"

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1024,
            1026,
            [(10, u"2404:6800:4004:80c::33")],
            10,
            [(10, 64, False, u"2404:6800:4004:80c::"), (10, 128, True, u"2404:6800:4004:80c::101f")],
            True,
        )
    assert str(excinfo.value) == 'Address family "AF_INET6" unsupported by nat method_name'
    assert mock_ipt_chain_exists.mock_calls == []
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == []

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027, [(2, u"1.2.3.33")], 2, [(2, 24, False, u"1.2.3.0"), (2, 32, True, u"1.2.3.66")], True
        )
    assert str(excinfo.value) == "UDP not supported by nat method_name"
    assert mock_ipt_chain_exists.mock_calls == []
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == []

    method.setup_firewall(
        1025, 1027, [(2, u"1.2.3.33")], 2, [(2, 24, False, u"1.2.3.0"), (2, 32, True, u"1.2.3.66")], False
    )
    assert mock_ipt_chain_exists.mock_calls == [call(2, "nat", "sshuttle-1025")]
    assert mock_ipt_ttl.mock_calls == [
        call(
            2,
            "nat",
            "-A",
            "sshuttle-1025",
            "-j",
            "REDIRECT",
            "--dest",
            u"1.2.3.0/24",
            "-p",
            "tcp",
            "--to-ports",
            "1025",
        ),
        call(
            2,
            "nat",
            "-A",
            "sshuttle-1025",
            "-j",
            "REDIRECT",
            "--dest",
            u"1.2.3.33/32",
            "-p",
            "udp",
            "--dport",
            "53",
            "--to-ports",
            "1027",
        ),
    ]
    assert mock_ipt.mock_calls == [
        call(2, "nat", "-D", "OUTPUT", "-j", "sshuttle-1025"),
        call(2, "nat", "-D", "PREROUTING", "-j", "sshuttle-1025"),
        call(2, "nat", "-F", "sshuttle-1025"),
        call(2, "nat", "-X", "sshuttle-1025"),
        call(2, "nat", "-N", "sshuttle-1025"),
        call(2, "nat", "-F", "sshuttle-1025"),
        call(2, "nat", "-I", "OUTPUT", "1", "-j", "sshuttle-1025"),
        call(2, "nat", "-I", "PREROUTING", "1", "-j", "sshuttle-1025"),
        call(2, "nat", "-A", "sshuttle-1025", "-j", "RETURN", "--dest", u"1.2.3.66/32", "-p", "tcp"),
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()

    method.restore_firewall(1025, 2, False)
    assert mock_ipt_chain_exists.mock_calls == [call(2, "nat", "sshuttle-1025")]
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == [
        call(2, "nat", "-D", "OUTPUT", "-j", "sshuttle-1025"),
        call(2, "nat", "-D", "PREROUTING", "-j", "sshuttle-1025"),
        call(2, "nat", "-F", "sshuttle-1025"),
        call(2, "nat", "-X", "sshuttle-1025"),
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()
开发者ID:Kriechi,项目名称:sshuttle,代码行数:91,代码来源:test_methods_nat.py


示例16: test_setup_firewall_openbsd

def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    mock_pfctl.side_effect = pfctl

    method = get_method('pf')
    assert method.name == 'pf'

    method.setup_firewall(
        1024, 1026,
        [(AF_INET6, u'2404:6800:4004:80c::33')],
        AF_INET6,
        [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
            (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
        False,
        None)

    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'match on lo\n'),
        call('-s all'),
        call('-a sshuttle6-1024 -f /dev/stdin',
             b'table <dns_servers> {2404:6800:4004:80c::33}\n'
             b'pass in on lo0 inet6 proto tcp to 2404:6800:4004:80c::/64 '
             b'port 8000:9000 divert-to ::1 port 1024\n'
             b'pass in on lo0 inet6 proto udp '
             b'to <dns_servers> port 53 rdr-to ::1 port 1026\n'
             b'pass out inet6 proto tcp to 2404:6800:4004:80c::/64 '
             b'port 8000:9000 route-to lo0 keep state\n'
             b'pass out inet6 proto tcp to '
             b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
             b'pass out inet6 proto udp to '
             b'<dns_servers> port 53 route-to lo0 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(AF_INET, u'1.2.3.33')],
            AF_INET,
            [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
                (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
            True,
            None)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(AF_INET, u'1.2.3.33')],
        AF_INET,
        [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
            (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
        False,
        None)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'match on lo\n'),
        call('-s all'),
        call('-a sshuttle-1025 -f /dev/stdin',
             b'table <dns_servers> {1.2.3.33}\n'
             b'pass in on lo0 inet proto tcp to 1.2.3.0/24 divert-to '
             b'127.0.0.1 port 1025\n'
             b'pass in on lo0 inet proto udp to '
             b'<dns_servers> port 53 rdr-to 127.0.0.1 port 1027\n'
             b'pass out inet proto tcp to 1.2.3.0/24 route-to lo0 keep state\n'
             b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
             b'pass out inet proto udp to '
             b'<dns_servers> port 53 route-to lo0 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, AF_INET, False, None)
    method.restore_firewall(1024, AF_INET6, False, None)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [
        call('-a sshuttle-1025 -F all'),
        call('-a sshuttle6-1024 -F all'),
        call('-d'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()
开发者ID:luserx0,项目名称:sshuttle,代码行数:98,代码来源:test_methods_pf.py


示例17: test_setup_firewall_notdarwin

def test_setup_firewall_notdarwin(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    mock_pfctl.side_effect = pfctl

    method = get_method('pf')
    assert method.name == 'pf'

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1024, 1026,
            [(10, u'2404:6800:4004:80c::33')],
            10,
            [(10, 64, False, u'2404:6800:4004:80c::'),
                (10, 128, True, u'2404:6800:4004:80c::101f')],
            True)
    assert str(excinfo.value) \
        == 'Address family "AF_INET6" unsupported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(2, u'1.2.3.33')],
            2,
            [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')],
            True)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(2, u'1.2.3.33')],
        2,
        [(2, 24, False, u'1.2.3.0'), (2, 32, True, u'1.2.3.66')],
        False)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s all'),
        call('-a sshuttle -f /dev/stdin',
             b'table <forward_subnets> {!1.2.3.66/32,1.2.3.0/24}\n'
             b'table <dns_servers> {1.2.3.33}\n'
             b'rdr pass on lo0 proto tcp '
             b'to <forward_subnets> -> 127.0.0.1 port 1025\n'
             b'rdr pass on lo0 proto udp '
             b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
             b'pass out route-to lo0 inet proto tcp '
             b'to <forward_subnets> keep state\n'
             b'pass out route-to lo0 inet proto udp '
             b'to <dns_servers> port 53 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, 2, False)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [
        call('-a sshuttle -F all'),
        call("-d"),
    ]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()
开发者ID:Kriechi,项目名称:sshuttle,代码行数:74,代码来源:test_methods_pf.py


示例18: test_setup_firewall

def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt):
    mock_ipt_chain_exists.return_value = True
    method = get_method('tproxy')
    assert method.name == 'tproxy'

    # IPV6

    method.setup_firewall(
        1024, 1026,
        [(10, u'2404:6800:4004:80c::33')],
        10,
        [(10, 64, False, u'2404:6800:4004:80c::'),
            (10, 128, True, u'2404:6800:4004:80c::101f')],
        True)
    assert mock_ipt_chain_exists.mock_calls == [
        call(10, 'mangle', 'sshuttle-m-1024'),
        call(10, 'mangle', 'sshuttle-t-1024'),
        call(10, 'mangle', 'sshuttle-d-1024')
    ]
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == [
        call(10, 'mangle', '-D', 'OUTPUT', '-j', 'sshuttle-m-1024'),
        call(10, 'mangle', '-F', 'sshuttle-m-1024'),
        call(10, 'mangle', '-X', 'sshuttle-m-1024'),
        call(10, 'mangle', '-D', 'PREROUTING', '-j', 'sshuttle-t-1024'),
        call(10, 'mangle', '-F', 'sshuttle-t-1024'),
        call(10, 'mangle', '-X', 'sshuttle-t-1024'),
        call(10, 'mangle', '-F', 'sshuttle-d-1024'),
        call(10, 'mangle', '-X', 'sshuttle-d-1024'),
        call(10, 'mangle', '-N', 'sshuttle-m-1024'),
        call(10, 'mangle', '-F', 'sshuttle-m-1024'),
        call(10, 'mangle', '-N', 'sshuttle-d-1024'),
        call(10, 'mangle', '-F', 'sshuttle-d-1024'),
        call(10, 'mangle', '-N', 'sshuttle-t-1024'),
        call(10, 'mangle', '-F', 'sshuttle-t-1024'),
        call(10, 'mangle', '-I', 'OUTPUT', '1', '-j', 'sshuttle-m-1024'),
        call(10, 'mangle', '-I', 'PREROUTING', '1', '-j', 'sshuttle-t-1024'),
        call(10, 'mangle', '-A', 'sshuttle-d-1024', '-j', 'MARK',
             '--set-mark', '1'),
        call(10, 'mangle', '-A', 'sshuttle-d-1024', '-j', 'ACCEPT'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-m', 'socket',
             '-j', 'sshuttle-d-1024', '-m', 'tcp', '-p', 'tcp'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-m', 'socket',
             '-j', 'sshuttle-d-1024', '-m', 'udp', '-p', 'udp'),
        call(10, 'mangle', '-A', 'sshuttle-m-1024', '-j', 'MARK',
             '--set-mark', '1', '--dest', u'2404:6800:4004:80c::33/32',
             '-m', 'udp', '-p', 'udp', '--dport', '53'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-j', 'TPROXY',
             '--tproxy-mark', '0x1/0x1',
             '--dest', u'2404:6800:4004:80c::33/32',
             '-m', 'udp', '-p', 'udp', '--dport', '53', '--on-port', '1026'),
        call(10, 'mangle', '-A', 'sshuttle-m-1024', '-j', 'RETURN',
             '--dest', u'2404:6800:4004:80c::101f/128',
             '-m', 'tcp', '-p', 'tcp'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-j', 'RETURN',
             '--dest', u'2404:6800:4004:80c::101f/128',
             '-m', 'tcp', '-p', 'tcp'),
        call(10, 'mangle', '-A', 'sshuttle-m-1024', '-j', 'RETURN',
             '--dest', u'2404:6800:4004:80c::101f/128',
             '-m', 'udp', '-p', 'udp'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-j', 'RETURN',
             '--dest', u'2404:6800:4004:80c::101f/128',
             '-m', 'udp', '-p', 'udp'),
        call(10, 'mangle', '-A', 'sshuttle-m-1024', '-j', 'MARK',
             '--set-mark', '1', '--dest', u'2404:6800:4004:80c::/64',
             '-m', 'tcp', '-p', 'tcp'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-j', 'TPROXY',
             '--tproxy-mark', '0x1/0x1', '--dest', u'2404:6800:4004:80c::/64',
             '-m', 'tcp', '-p', 'tcp', '--on-port', '1024'),
        call(10, 'mangle', '-A', 'sshuttle-m-1024', '-j', 'MARK',
             '--set-mark', '1', '--dest', u'2404:6800:4004:80c::/64',
             '-m', 'udp', '-p', 'udp'),
        call(10, 'mangle', '-A', 'sshuttle-t-1024', '-j', 'TPROXY',
             '--tproxy-mark', '0x1/0x1', '--dest', u'2404:6800:4004:80c::/64',
             '-m', 'udp', '-p', 'udp', '--on-port', '1024')
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()

    method.setup_firewall(1025, 0, [], 10, [], True)
    assert mock_ipt_chain_exists.mock_calls == [
        call(10, 'mangle', 'sshuttle-m-1025'),
        call(10, 'mangle', 'sshuttle-t-1025'),
        call(10, 'mangle', 'sshuttle-d-1025')
    ]
    assert mock_ipt_ttl.mock_calls == []
    assert mock_ipt.mock_calls == [
        call(10, 'mangle', '-D', 'OUTPUT', '-j', 'sshuttle-m-1025'),
        call(10, 'mangle', '-F', 'sshuttle-m-1025'),
        call(10, 'mangle', '-X', 'sshuttle-m-1025'),
        call(10, 'mangle', '-D', 'PREROUTING', '-j', 'sshuttle-t-1025'),
        call(10, 'mangle', '-F', 'sshuttle-t-1025'),
        call(10, 'mangle', '-X', 'sshuttle-t-1025'),
        call(10, 'mangle', '-F', 'sshuttle-d-1025'),
        call(10, 'mangle', '-X', 'sshuttle-d-1025')
    ]
    mock_ipt_chain_exists.reset_mock()
    mock_ipt_ttl.reset_mock()
    mock_ipt.reset_mock()
#.........这里部分代码省略.........
开发者ID:tberton,项目名称:sshuttle,代码行数:101,代码来源:test_methods_tproxy.py


示例19: test_firewall_command

def test_firewall_command():
    method = get_method('tproxy')
    assert not method.firewall_command("somthing")
开发者ID:tberton,项目名称:sshuttle,代码行数:3,代码来源:test_methods_tproxy.py


示例20: test_check_settings

def test_check_settings():
    method = get_method('tproxy')
    method.check_settings(True, True)
    method.check_settings(False, True)
开发者ID:tberton,项目名称:sshuttle,代码行数:4,代码来源:test_methods_tproxy.py



注:本文中的sshuttle.methods.get_method函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ssl._create_stdlib_context函数代码示例发布时间:2022-05-27
下一篇:
Python helpers.log函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap