本文整理汇总了Python中test.runner.get_tor_controller函数的典型用法代码示例。如果您正苦于以下问题:Python get_tor_controller函数的具体用法?Python get_tor_controller怎么用?Python get_tor_controller使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_tor_controller函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_with_detached_ephemeral_hidden_services
def test_with_detached_ephemeral_hidden_services(self):
"""
Exercises creating detached ephemeral hidden services and methods when
they're present.
"""
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
response = controller.create_ephemeral_hidden_service(4567, detached = True)
self.assertEqual([], controller.list_ephemeral_hidden_services())
self.assertEqual([response.service_id], controller.list_ephemeral_hidden_services(detached = True))
# drop and recreate the service
self.assertEqual(True, controller.remove_ephemeral_hidden_service(response.service_id))
self.assertEqual([], controller.list_ephemeral_hidden_services(detached = True))
controller.create_ephemeral_hidden_service(4567, key_type = response.private_key_type, key_content = response.private_key, detached = True)
self.assertEqual([response.service_id], controller.list_ephemeral_hidden_services(detached = True))
# other controllers should be able to see this service, and drop it
with runner.get_tor_controller() as second_controller:
self.assertEqual([response.service_id], second_controller.list_ephemeral_hidden_services(detached = True))
self.assertEqual(True, second_controller.remove_ephemeral_hidden_service(response.service_id))
self.assertEqual([], controller.list_ephemeral_hidden_services(detached = True))
# recreate the service and confirms that it outlives this controller
response = second_controller.create_ephemeral_hidden_service(4567, detached = True)
self.assertEqual([response.service_id], controller.list_ephemeral_hidden_services(detached = True))
controller.remove_ephemeral_hidden_service(response.service_id)
开发者ID:patrickod,项目名称:stem,代码行数:33,代码来源:controller.py
示例2: test_with_ephemeral_hidden_services
def test_with_ephemeral_hidden_services(self):
"""
Exercises creating ephemeral hidden services and methods when they're
present.
"""
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
# try creating a service with an invalid ports
for ports in (4567890, [4567, 4567890], {4567: '-:4567'}):
exc_msg = "ADD_ONION response didn't have an OK status: Invalid VIRTPORT/TARGET"
self.assertRaisesRegexp(stem.ProtocolError, exc_msg, controller.create_ephemeral_hidden_service, ports)
response = controller.create_ephemeral_hidden_service(4567)
self.assertEqual([response.service_id], controller.list_ephemeral_hidden_services())
self.assertTrue(response.private_key is not None)
self.assertEqual({}, response.client_auth)
# drop the service
self.assertEqual(True, controller.remove_ephemeral_hidden_service(response.service_id))
self.assertEqual([], controller.list_ephemeral_hidden_services())
# recreate the service with the same private key
recreate_response = controller.create_ephemeral_hidden_service(4567, key_type = response.private_key_type, key_content = response.private_key)
self.assertEqual([response.service_id], controller.list_ephemeral_hidden_services())
self.assertEqual(response.service_id, recreate_response.service_id)
# the response only includes the private key when making a new one
self.assertEqual(None, recreate_response.private_key)
self.assertEqual(None, recreate_response.private_key_type)
# create a service where we never see the private key
response = controller.create_ephemeral_hidden_service(4568, discard_key = True)
self.assertTrue(response.service_id in controller.list_ephemeral_hidden_services())
self.assertEqual(None, response.private_key)
self.assertEqual(None, response.private_key_type)
# other controllers shouldn't be able to see these hidden services
with runner.get_tor_controller() as second_controller:
self.assertEqual(2, len(controller.list_ephemeral_hidden_services()))
self.assertEqual(0, len(second_controller.list_ephemeral_hidden_services()))
开发者ID:patrickod,项目名称:stem,代码行数:48,代码来源:controller.py
示例3: test_close_circuit
def test_close_circuit(self):
"""
Tests Controller.close_circuit with valid and invalid input.
"""
if test.runner.require_control(self):
return
elif test.runner.require_online(self):
return
elif test.runner.require_version(self, Requirement.EXTENDCIRCUIT_PATH_OPTIONAL):
return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
circuit_id = controller.new_circuit()
controller.close_circuit(circuit_id)
circuit_output = controller.get_info("circuit-status")
circ = [x.split()[0] for x in circuit_output.splitlines()]
self.assertFalse(circuit_id in circ)
circuit_id = controller.new_circuit()
controller.close_circuit(circuit_id, "IfUnused")
circuit_output = controller.get_info("circuit-status")
circ = [x.split()[0] for x in circuit_output.splitlines()]
self.assertFalse(circuit_id in circ)
circuit_id = controller.new_circuit()
self.assertRaises(stem.InvalidArguments, controller.close_circuit, circuit_id + "1024")
self.assertRaises(stem.InvalidRequest, controller.close_circuit, "")
开发者ID:axitkhurana,项目名称:stem,代码行数:30,代码来源:controller.py
示例4: test_is_set
def test_is_set(self):
"""
Exercises our is_set() method.
"""
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
custom_options = controller._get_custom_options()
self.assertTrue('ControlPort' in custom_options or 'ControlSocket' in custom_options)
self.assertEqual('1', custom_options['DownloadExtraInfo'])
self.assertEqual('1112', custom_options['SocksPort'])
self.assertTrue(controller.is_set('DownloadExtraInfo'))
self.assertTrue(controller.is_set('SocksPort'))
self.assertFalse(controller.is_set('CellStatistics'))
self.assertFalse(controller.is_set('ConnLimit'))
# check we update when setting and resetting values
controller.set_conf('ConnLimit', '1005')
self.assertTrue(controller.is_set('ConnLimit'))
controller.reset_conf('ConnLimit')
self.assertFalse(controller.is_set('ConnLimit'))
开发者ID:patrickod,项目名称:stem,代码行数:25,代码来源:controller.py
示例5: test_get_streams
def test_get_streams(self):
"""
Tests Controller.get_streams().
"""
if test.runner.require_control(self):
return
elif test.runner.require_online(self):
return
host = "38.229.72.14" # www.torproject.org
port = 443
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
# we only need one proxy port, so take the first
socks_listener = controller.get_socks_listeners()[0]
with test.network.Socks(socks_listener) as s:
s.settimeout(30)
s.connect((host, port))
streams = controller.get_streams()
# Because we do not get a stream id when opening a stream,
# try to match the target for which we asked a stream.
self.assertTrue("%s:%s" % (host, port) in [stream.target for stream in streams])
开发者ID:axitkhurana,项目名称:stem,代码行数:27,代码来源:controller.py
示例6: test_close_stream
def test_close_stream(self):
"""
Tests Controller.close_stream with valid and invalid input.
"""
if test.runner.require_control(self):
return
elif test.runner.require_online(self):
return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
# use the first socks listener
socks_listener = controller.get_socks_listeners()[0]
with test.network.Socks(socks_listener) as s:
s.settimeout(30)
s.connect(("www.torproject.org", 443))
# There's only one stream right now. Right?
built_stream = controller.get_streams()[0]
# Make sure we have the stream for which we asked, otherwise
# the next assertion would be a false positive.
self.assertEqual([built_stream.id], [stream.id for stream in controller.get_streams()])
# Try to close our stream...
controller.close_stream(built_stream.id)
# ...which means there are zero streams.
self.assertEqual([], controller.get_streams())
# unknown stream
self.assertRaises(stem.InvalidArguments, controller.close_stream, "blarg")
开发者ID:axitkhurana,项目名称:stem,代码行数:30,代码来源:controller.py
示例7: test_mapaddress
def test_mapaddress(self):
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
controller.map_address({'1.2.1.2': 'ifconfig.me'})
s = None
response = None
# try up to 10 times to rule out transient network failures
for _ in range(10):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(30)
s.connect(('127.0.0.1', int(controller.get_conf('SocksPort'))))
test.network.negotiate_socks(s, '1.2.1.2', 80)
s.sendall(stem.util.str_tools._to_bytes(test.network.ip_request)) # make the http request for the ip address
response = s.recv(1000)
if response:
break
except (stem.ProtocolError, socket.timeout):
continue
finally:
if s:
s.close()
self.assertTrue(response)
# everything after the blank line is the 'data' in a HTTP response.
# The response data for our request for request should be an IP address + '\n'
ip_addr = response[response.find(b'\r\n\r\n'):].strip()
self.assertTrue(stem.util.connection.is_valid_ipv4_address(stem.util.str_tools._to_unicode(ip_addr)), "'%s' isn't an address" % ip_addr)
开发者ID:patrickod,项目名称:stem,代码行数:35,代码来源:controller.py
示例8: test_get_server_descriptors
def test_get_server_descriptors(self):
"""
Fetches a few descriptors via the get_server_descriptors() method.
"""
runner = test.runner.get_runner()
if test.runner.require_control(self):
return
elif runner.get_tor_version() >= Requirement.MICRODESCRIPTOR_IS_DEFAULT:
test.runner.skip(self, "(requires server descriptors)")
return
with runner.get_tor_controller() as controller:
count = 0
for desc in controller.get_server_descriptors():
self.assertTrue(desc.fingerprint is not None)
self.assertTrue(desc.nickname is not None)
# Se don't want to take the time to read the whole thing. We already
# have another test that reads the full cached descriptors (and takes a
# while to do so).
count += 1
if count > 10:
break
开发者ID:ouzel,项目名称:stem,代码行数:27,代码来源:controller.py
示例9: test_repurpose_circuit
def test_repurpose_circuit(self):
"""
Tests Controller.repurpose_circuit with valid and invalid input.
"""
if test.runner.require_control(self):
return
elif test.runner.require_online(self):
return
elif test.runner.require_version(self, Requirement.EXTENDCIRCUIT_PATH_OPTIONAL):
return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
circ_id = controller.new_circuit()
controller.repurpose_circuit(circ_id, "CONTROLLER")
circuit = controller.get_circuit(circ_id)
self.assertTrue(circuit.purpose == "CONTROLLER")
controller.repurpose_circuit(circ_id, "GENERAL")
circuit = controller.get_circuit(circ_id)
self.assertTrue(circuit.purpose == "GENERAL")
self.assertRaises(stem.InvalidRequest, controller.repurpose_circuit, 'f934h9f3h4', "fooo")
self.assertRaises(stem.InvalidRequest, controller.repurpose_circuit, '4', "fooo")
开发者ID:axitkhurana,项目名称:stem,代码行数:26,代码来源:controller.py
示例10: test_get_network_statuses
def test_get_network_statuses(self):
"""
Fetches a few descriptors via the get_network_statuses() method.
"""
runner = test.runner.get_runner()
if test.runner.require_control(self):
return
with runner.get_tor_controller() as controller:
count = 0
for desc in controller.get_network_statuses():
self.assertTrue(desc.fingerprint is not None)
self.assertTrue(desc.nickname is not None)
unrecognized_lines = desc.get_unrecognized_lines()
if unrecognized_lines:
self.fail("Unrecognized descriptor content: %s" % unrecognized_lines)
count += 1
if count > 10:
break
开发者ID:ouzel,项目名称:stem,代码行数:25,代码来源:controller.py
示例11: test_get_server_descriptor
def test_get_server_descriptor(self):
"""
Compares get_server_descriptor() against our cached descriptors.
"""
runner = test.runner.get_runner()
descriptor_path = runner.get_test_dir("cached-descriptors")
if test.runner.require_control(self): return
elif not os.path.exists(descriptor_path):
test.runner.skip(self, "(no cached descriptors)")
return
with runner.get_tor_controller() as controller:
# we should balk at invalid content
self.assertRaises(ValueError, controller.get_server_descriptor, None)
self.assertRaises(ValueError, controller.get_server_descriptor, "")
self.assertRaises(ValueError, controller.get_server_descriptor, 5)
self.assertRaises(ValueError, controller.get_server_descriptor, "z" * 30)
# try with a relay that doesn't exist
self.assertRaises(stem.socket.ControllerError, controller.get_server_descriptor, "blargg")
self.assertRaises(stem.socket.ControllerError, controller.get_server_descriptor, "5" * 40)
first_descriptor = None
with stem.descriptor.reader.DescriptorReader([descriptor_path]) as reader:
for desc in reader:
if desc.nickname != "Unnamed":
first_descriptor = desc
break
self.assertEqual(first_descriptor, controller.get_server_descriptor(first_descriptor.fingerprint))
self.assertEqual(first_descriptor, controller.get_server_descriptor(first_descriptor.nickname))
开发者ID:eoinof,项目名称:stem,代码行数:33,代码来源:controller.py
示例12: test_get_server_descriptor
def test_get_server_descriptor(self):
"""
Basic checks for get_server_descriptor().
"""
runner = test.runner.get_runner()
if test.runner.require_control(self):
return
elif runner.get_tor_version() >= Requirement.MICRODESCRIPTOR_IS_DEFAULT:
test.runner.skip(self, "(requires server descriptors)")
return
with runner.get_tor_controller() as controller:
# we should balk at invalid content
self.assertRaises(ValueError, controller.get_server_descriptor, None)
self.assertRaises(ValueError, controller.get_server_descriptor, "")
self.assertRaises(ValueError, controller.get_server_descriptor, 5)
self.assertRaises(ValueError, controller.get_server_descriptor, "z" * 30)
# try with a relay that doesn't exist
self.assertRaises(stem.ControllerError, controller.get_server_descriptor, "blargg")
self.assertRaises(stem.ControllerError, controller.get_server_descriptor, "5" * 40)
test_relay = self._get_router_status_entry(controller)
desc_by_fingerprint = controller.get_server_descriptor(test_relay.fingerprint)
desc_by_nickname = controller.get_server_descriptor(test_relay.nickname)
self.assertEqual(desc_by_fingerprint, desc_by_nickname)
开发者ID:axitkhurana,项目名称:stem,代码行数:30,代码来源:controller.py
示例13: test_protocolinfo
def test_protocolinfo(self):
"""
Test that the convenient method protocolinfo() works.
"""
if test.runner.require_control(self):
return
runner = test.runner.get_runner()
with runner.get_tor_controller(False) as controller:
protocolinfo = controller.get_protocolinfo()
self.assertTrue(isinstance(protocolinfo, stem.response.protocolinfo.ProtocolInfoResponse))
# Doing a sanity test on the ProtocolInfoResponse instance returned.
tor_options = runner.get_options()
tor_version = runner.get_tor_version()
auth_methods = []
if test.runner.Torrc.COOKIE in tor_options:
auth_methods.append(stem.response.protocolinfo.AuthMethod.COOKIE)
if tor_version >= stem.version.Requirement.AUTH_SAFECOOKIE:
auth_methods.append(stem.response.protocolinfo.AuthMethod.SAFECOOKIE)
if test.runner.Torrc.PASSWORD in tor_options:
auth_methods.append(stem.response.protocolinfo.AuthMethod.PASSWORD)
if not auth_methods:
auth_methods.append(stem.response.protocolinfo.AuthMethod.NONE)
self.assertEqual(tuple(auth_methods), protocolinfo.auth_methods)
开发者ID:axitkhurana,项目名称:stem,代码行数:32,代码来源:controller.py
示例14: test_loadconf
def test_loadconf(self):
"""
Exercises Controller.load_conf with valid and invalid requests.
"""
if test.runner.require_control(self):
return
elif test.runner.require_version(self, stem.version.Requirement.LOADCONF):
return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
oldconf = runner.get_torrc_contents()
try:
# invalid requests
self.assertRaises(stem.InvalidRequest, controller.load_conf, "ContactInfo confloaded")
try:
controller.load_conf("Blahblah blah")
self.fail()
except stem.InvalidArguments as exc:
self.assertEqual(["Blahblah"], exc.arguments)
# valid config
controller.load_conf(runner.get_torrc_contents() + "\nContactInfo confloaded\n")
self.assertEqual("confloaded", controller.get_conf("ContactInfo"))
finally:
# reload original valid config
controller.load_conf(oldconf)
开发者ID:axitkhurana,项目名称:stem,代码行数:32,代码来源:controller.py
示例15: test_loadconf
def test_loadconf(self):
"""
Exercises Controller.load_conf with valid and invalid requests.
"""
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
oldconf = runner.get_torrc_contents()
try:
# Check a request that changes our DataDir. Tor should rightfully balk
# at this...
#
# InvalidRequest: Transition not allowed: Failed to parse/validate
# config: While Tor is running, changing DataDirectory
# ("/home/atagar/Desktop/stem/test/data"->"/home/atagar/.tor") is not
# allowed.
self.assertRaises(stem.InvalidRequest, controller.load_conf, 'ContactInfo confloaded')
try:
controller.load_conf('Blahblah blah')
self.fail()
except stem.InvalidArguments as exc:
self.assertEqual(['Blahblah'], exc.arguments)
# valid config
controller.load_conf(runner.get_torrc_contents() + '\nContactInfo confloaded\n')
self.assertEqual('confloaded', controller.get_conf('ContactInfo'))
finally:
# reload original valid config
controller.load_conf(oldconf)
controller.reset_conf('__OwningControllerProcess')
开发者ID:patrickod,项目名称:stem,代码行数:35,代码来源:controller.py
示例16: test_enable_feature
def test_enable_feature(self):
"""
Test Controller.enable_feature with valid and invalid inputs.
"""
if test.runner.require_control(self):
return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
if not test.runner.require_version(self, stem.version.Version("0.1.2.2-alpha")):
controller.enable_feature("VERBOSE_NAMES")
self.assertTrue(controller.is_feature_enabled("VERBOSE_NAMES"))
orconn_output = controller.get_info('orconn-status')
# the orconn-status results will be empty if we don't have a connection
if orconn_output == '':
if test.runner.require_online(self):
return
self.assertTrue("VERBOSE_NAMES" in controller._enabled_features)
self.assertRaises(stem.InvalidArguments, controller.enable_feature, ["NOT", "A", "FEATURE"])
try:
controller.enable_feature(["NOT", "A", "FEATURE"])
except stem.InvalidArguments as exc:
self.assertEqual(["NOT"], exc.arguments)
else:
self.fail()
开发者ID:axitkhurana,项目名称:stem,代码行数:32,代码来源:controller.py
示例17: test_event_handling
def test_event_handling(self):
"""
Add a couple listeners for various events and make sure that they receive
them. Then remove the listeners.
"""
if test.runner.require_control(self):
return
event_notice1, event_notice2 = threading.Event(), threading.Event()
event_buffer1, event_buffer2 = [], []
def listener1(event):
event_buffer1.append(event)
event_notice1.set()
def listener2(event):
event_buffer2.append(event)
event_notice2.set()
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
controller.add_event_listener(listener1, EventType.BW)
controller.add_event_listener(listener2, EventType.BW, EventType.DEBUG)
# BW events occure at the rate of one per second, so wait a bit to let
# some accumulate.
event_notice1.wait(2)
self.assertTrue(len(event_buffer1) >= 1)
event_notice1.clear()
event_notice2.wait(2)
self.assertTrue(len(event_buffer2) >= 1)
event_notice2.clear()
# Checking that a listener's no longer called after being removed.
controller.remove_event_listener(listener2)
buffer2_size = len(event_buffer2)
event_notice1.wait(2)
self.assertTrue(len(event_buffer1) >= 2)
event_notice2.wait(2)
self.assertEqual(buffer2_size, len(event_buffer2))
for event in event_buffer1:
self.assertTrue(isinstance(event, stem.response.events.Event))
self.assertEqual(2, len(event.positional_args))
self.assertEqual({}, event.keyword_args)
self.assertTrue(isinstance(event, stem.response.events.BandwidthEvent))
self.assertTrue(hasattr(event, 'read'))
self.assertTrue(hasattr(event, 'written'))
开发者ID:axitkhurana,项目名称:stem,代码行数:57,代码来源:controller.py
示例18: test_authenticate_general_controller
def test_authenticate_general_controller(self):
"""
Tests that the authenticate function can authenticate via a Controller.
"""
runner = test.runner.get_runner()
with runner.get_tor_controller(False) as controller:
stem.connection.authenticate(controller, test.runner.CONTROL_PASSWORD, runner.get_chroot())
test.runner.exercise_controller(self, controller)
开发者ID:jacthinman,项目名称:Tor-Stem,代码行数:9,代码来源:authentication.py
示例19: test_event_handling
def test_event_handling(self):
"""
Add a couple listeners for various events and make sure that they receive
them. Then remove the listeners.
"""
event_notice1, event_notice2 = threading.Event(), threading.Event()
event_buffer1, event_buffer2 = [], []
def listener1(event):
event_buffer1.append(event)
event_notice1.set()
def listener2(event):
event_buffer2.append(event)
event_notice2.set()
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
controller.add_event_listener(listener1, EventType.CONF_CHANGED)
controller.add_event_listener(listener2, EventType.CONF_CHANGED, EventType.DEBUG)
# The NodeFamily is a harmless option we can toggle
controller.set_conf('NodeFamily', 'FD4CC275C5AA4D27A487C6CA29097900F85E2C33')
# Wait for the event. Assert that we get it within 10 seconds
event_notice1.wait(10)
self.assertEqual(len(event_buffer1), 1)
event_notice1.clear()
event_notice2.wait(10)
self.assertTrue(len(event_buffer2) >= 1)
event_notice2.clear()
# Checking that a listener's no longer called after being removed.
controller.remove_event_listener(listener2)
buffer2_size = len(event_buffer2)
controller.set_conf('NodeFamily', 'A82F7EFDB570F6BC801805D0328D30A99403C401')
event_notice1.wait(10)
self.assertEqual(len(event_buffer1), 2)
event_notice1.clear()
self.assertEqual(buffer2_size, len(event_buffer2))
for event in event_buffer1:
self.assertTrue(isinstance(event, stem.response.events.Event))
self.assertEqual(0, len(event.positional_args))
self.assertEqual({}, event.keyword_args)
self.assertTrue(isinstance(event, stem.response.events.ConfChangedEvent))
controller.reset_conf('NodeFamily')
开发者ID:patrickod,项目名称:stem,代码行数:56,代码来源:controller.py
示例20: test_getconf
def test_getconf(self):
"""
Exercises GETCONF with valid and invalid queries.
"""
if test.runner.require_control(self): return
runner = test.runner.get_runner()
with runner.get_tor_controller() as controller:
socket = controller.get_socket()
if isinstance(socket, stem.socket.ControlPort):
connection_value = str(socket.get_port())
config_key = "ControlPort"
elif isinstance(socket, stem.socket.ControlSocketFile):
connection_value = str(socket.get_socket_path())
config_key = "ControlSocket"
# successful single query
self.assertEqual(connection_value, controller.get_conf(config_key))
self.assertEqual(connection_value, controller.get_conf(config_key, "la-di-dah"))
# succeessful batch query
expected = {config_key: [connection_value]}
self.assertEqual(expected, controller.get_conf_map([config_key]))
self.assertEqual(expected, controller.get_conf_map([config_key], "la-di-dah"))
request_params = ["ControlPORT", "dirport", "datadirectory"]
reply_params = controller.get_conf_map(request_params, multiple=False).keys()
self.assertEqual(set(request_params), set(reply_params))
# non-existant option(s)
self.assertRaises(stem.socket.InvalidArguments, controller.get_conf, "blarg")
self.assertEqual("la-di-dah", controller.get_conf("blarg", "la-di-dah"))
self.assertRaises(stem.socket.InvalidArguments, controller.get_conf_map, "blarg")
self.assertEqual("la-di-dah", controller.get_conf_map("blarg", "la-di-dah"))
self.assertRaises(stem.socket.InvalidRequest, controller.get_conf_map, ["blarg", "huadf"], multiple = True)
self.assertEqual("la-di-dah", controller.get_conf_map(["erfusdj", "afiafj"], "la-di-dah", multiple = True))
# multivalue configuration keys
nodefamilies = [("abc", "xyz", "pqrs"), ("mno", "tuv", "wxyz")]
controller.msg("SETCONF %s" % " ".join(["nodefamily=\"" + ",".join(x) + "\"" for x in nodefamilies]))
self.assertEqual([",".join(n) for n in nodefamilies], controller.get_conf("nodefamily", multiple = True))
controller.msg("RESETCONF NodeFamily")
# empty input
self.assertEqual(None, controller.get_conf(""))
self.assertEqual({}, controller.get_conf_map([]))
self.assertEqual({}, controller.get_conf_map([""]))
self.assertEqual(None, controller.get_conf(" "))
self.assertEqual({}, controller.get_conf_map([" ", " "]))
self.assertEqual("la-di-dah", controller.get_conf("", "la-di-dah"))
self.assertEqual({}, controller.get_conf_map("", "la-di-dah"))
self.assertEqual({}, controller.get_conf_map([], "la-di-dah"))
开发者ID:eoinof,项目名称:stem,代码行数:56,代码来源:controller.py
注:本文中的test.runner.get_tor_controller函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论