本文整理汇总了Python中xmlrpc.client.ServerProxy类的典型用法代码示例。如果您正苦于以下问题:Python ServerProxy类的具体用法?Python ServerProxy怎么用?Python ServerProxy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ServerProxy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: MySdkTests
class MySdkTests(unittest.TestCase):
def setUp(self):
self.sdk = ServerProxy(RPC_SERVER_URI).sdk
def tearDown(self):
pass
def test_add(self, a=123, b=456):
result = self.sdk.add(a, b)
print("\nsdk.add(%d, %d) = %d" % (a, b, result))
self.assertEqual(result, 579)
def test_subtract(self, a=2015, b=1999):
result = self.sdk.subtract(a, b)
print("\nsdk.subtract(%d, %d) = %d" % (a, b, result))
self.assertEqual(result, 16)
def test_httpGet(self, ip="203.208.48.146"):
result = self.sdk.httpGet(ip)
print("\nsdk.httpGet(%s) is %s" % (ip, result))
self.assertIsNotNone(result)
def test_getDeviceInfo(self):
result = self.sdk.getDeviceInfo()
print("\nsdk.getDeviceInfo() is:\n%s" % (result))
self.assertIsNotNone(result)
开发者ID:oscarli,项目名称:xml-rpc-demo,代码行数:26,代码来源:test_mysdk.py
示例2: NZBConnection
def NZBConnection(url):
"""Method to connect to NZBget client.
:param url: nzb url to connect
:return: True if connected, else False
"""
nzbGetRPC = ServerProxy(url)
try:
if nzbGetRPC.writelog('INFO', 'Medusa connected to test connection.'):
log.debug('Successfully connected to NZBget')
else:
log.warning('Successfully connected to NZBget but unable to'
' send a message')
return True
except ProtocolError as error:
if error.errmsg == 'Unauthorized':
log.warning('NZBget username or password is incorrect.')
else:
log.error('Protocol Error: {msg}', {'msg': error.errmsg})
return False
except Error as error:
log.warning('Please check your NZBget host and port (if it is running).'
' NZBget is not responding to this combination.'
' Error: {msg}', {'msg': error.errmsg})
return False
except socket.error as error:
log.warning('Please check your NZBget host and port (if it is running).'
' NZBget is not responding to this combination.'
' Socket Error: {msg}', {'msg': error})
return False
开发者ID:pymedusa,项目名称:SickRage,代码行数:33,代码来源:nzbget.py
示例3: _do_update
def _do_update(self, hwids):
"""Query remote server for driver updates for a set of HardwareIDs."""
logging.debug("Querying XML-RPC driver database %s...", self.url)
client = ServerProxy(self.url)
# TODO: error handling; pass kernel_version, architecture
(res_proto_ver, res_proto_subver, drivers) = client.query(
"20080407",
"0",
{
"os_name": OSLib.inst.os_vendor,
"os_version": OSLib.inst.os_version,
"system_vendor": self.sys_vendor,
"system_product": self.sys_product,
"components": ["%s:%s" % (h.type, h.id) for h in hwids],
},
)
logging.debug(" -> protocol: %s/%s, drivers: %s", res_proto_ver, res_proto_subver, str(drivers))
self.cache = {}
if res_proto_ver != "20080407":
logging.warning(" unknown protocol version, not updating")
return
for hwid in hwids:
for drv in drivers.get("%s:%s" % (hwid.type, hwid.id), []):
if "driver_type" not in drv:
continue
self.cache.setdefault(hwid, []).append(DriverID(**drv))
开发者ID:Fabioamd87,项目名称:nfs-manager,代码行数:29,代码来源:detection.py
示例4: __sort_inputs_by_humanoid
def __sort_inputs_by_humanoid(self, possible_events):
if sys.version.startswith("3"):
from xmlrpc.client import ServerProxy
else:
from xmlrpclib import ServerProxy
proxy = ServerProxy("http://%s/" % self.device.humanoid)
request_json = {
"history_view_trees": self.humanoid_view_trees,
"history_events": [x.__dict__ for x in self.humanoid_events],
"possible_events": [x.__dict__ for x in possible_events],
"screen_res": [self.device.display_info["width"],
self.device.display_info["height"]]
}
result = json.loads(proxy.predict(json.dumps(request_json)))
new_idx = result["indices"]
text = result["text"]
new_events = []
# get rid of infinite recursive by randomizing first event
if not self.utg.is_state_reached(self.current_state):
new_first = random.randint(0, len(new_idx) - 1)
new_idx[0], new_idx[new_first] = new_idx[new_first], new_idx[0]
for idx in new_idx:
if isinstance(possible_events[idx], SetTextEvent):
possible_events[idx].text = text
new_events.append(possible_events[idx])
return new_events
开发者ID:chubbymaggie,项目名称:droidbot,代码行数:28,代码来源:input_policy.py
示例5: _register_with_server
def _register_with_server(self):
"""
Register child node with server
"""
name = self.name
self.logger.info("attempting to register with roslaunch parent [%s]"%self.server_uri)
try:
server = ServerProxy(self.server_uri)
code, msg, _ = server.register(name, self.uri)
if code != 1:
raise RLException("unable to register with roslaunch server: %s"%msg)
except Exception as e:
self.logger.error("Exception while registering with roslaunch parent [%s]: %s"%(self.server_uri, traceback.format_exc()))
# fail
raise RLException("Exception while registering with roslaunch parent [%s]: %s"%(self.server_uri, traceback.format_exc()))
self.logger.debug("child registered with server")
# register printlog handler so messages are funneled to remote
def serverlog(msg):
server.log(name, Log.INFO, msg)
def servererrlog(msg):
server.log(name, Log.ERROR, msg)
add_printlog_handler(serverlog)
add_printerrlog_handler(servererrlog)
# register process listener to forward process death events to main server
self.pm.add_process_listener(_ProcessListenerForwarder(server))
开发者ID:Aand1,项目名称:ROSCH,代码行数:28,代码来源:server.py
示例6: _do_update
def _do_update(self, hwids):
'''Query remote server for driver updates for a set of HardwareIDs.'''
logging.debug('Querying XML-RPC driver database %s...', self.url)
client = ServerProxy(self.url)
# TODO: error handling; pass kernel_version, architecture
(res_proto_ver, res_proto_subver, drivers) = client.query(
'20080407', '0', {
'os_name': OSLib.inst.os_vendor,
'os_version': OSLib.inst.os_version,
'system_vendor': self.sys_vendor,
'system_product': self.sys_product,
'components': ['%s:%s' % (h.type, h.id) for h in hwids]
})
logging.debug(' -> protocol: %s/%s, drivers: %s', res_proto_ver,
res_proto_subver, str(drivers))
self.cache = {}
if res_proto_ver != '20080407':
logging.warning(' unknown protocol version, not updating')
return
for hwid in hwids:
for drv in drivers.get('%s:%s' % (hwid.type, hwid.id), []):
if 'driver_type' not in drv:
continue
self.cache.setdefault(hwid, []).append(DriverID(**drv))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:27,代码来源:detection.py
示例7: kill_nodes
def kill_nodes(node_names):
"""
Call shutdown on the specified nodes
@return: list of nodes that shutdown was called on successfully and list of failures
@rtype: ([str], [str])
"""
master = rosgraph.Master(ID)
success = []
fail = []
tocall = []
try:
# lookup all nodes keeping track of lookup failures for return value
for n in node_names:
try:
uri = master.lookupNode(n)
tocall.append([n, uri])
except:
fail.append(n)
except socket.error:
raise ROSNodeIOException("Unable to communicate with master!")
for n, uri in tocall:
# the shutdown call can sometimes fail to succeed if the node
# tears down during the request handling, so we assume success
try:
p = ServerProxy(uri)
_succeed(p.shutdown(ID, 'user request'))
except:
pass
success.append(n)
return success, fail
开发者ID:Nishida-Lab,项目名称:RCR,代码行数:34,代码来源:__init__.py
示例8: replicate
def replicate(self, item):
if item is None:
return
path = item[0]
cs_list = item[1]
alive = [x for x in cs_list if self.ns._is_alive_cs(x)]
if len(alive) == 0:
print('There no live cs for chunk', path)
return
if len(alive) >= 2:
print('File', path, 'is already replicated to more than 2 nodes')
return
new_cs = self.ns._select_available_cs(alive)
if new_cs is None:
print("Can't find available cs for replication", path)
else:
try:
cl = ServerProxy(alive[0])
cl.replicate_chunk(path, new_cs)
print("File", path, "replicated to", new_cs)
file = self.ns.root.find_file_by_chunk(path)
if file is not None and path in file.chunks:
file.chunks[path].append(new_cs)
else:
print("Cant find file for chunk", path, "after replication")
except Exception as e:
print('Error during replicatiion', path, 'to', new_cs, ':', e)
开发者ID:osteotek,项目名称:yadfs,代码行数:33,代码来源:replicator.py
示例9: register
def register(self):
"""Tries to register with the Meta-Server"""
foamLogger().info("Trying to register as IP:%s PID:%d Port:%d"
% (self._answerer.ip(),
self._answerer.pid(),self._port))
try:
try:
meta=ServerProxy(
"http://%s:%d" % (config().get(
"Metaserver","ip"),config().getint("Metaserver","port")))
response=meta.registerServer(self._answerer.ip(),
self._answerer.pid(),self._port)
self.isRegistered=True
foamLogger().info("Registered with server. Response "
+ str(response))
except socket.error:
reason = sys.exc_info()[1] # compatible with 2.x and 3.x
foamLogger().warning("Can't connect to meta-server - SocketError: "+str(reason))
except:
foamLogger().error("Can't connect to meta-server - Unknown Error: "+str(sys.exc_info()[0]))
foamLogger().error(str(sys.exc_info()[1]))
foamLogger().error("Traceback: "+str(extract_tb(sys.exc_info()[2])))
except:
# print "Error during registering (no socket module?)"
pass
开发者ID:martinep,项目名称:foam-extend-svn,代码行数:26,代码来源:FoamServer.py
示例10: list_processes
def list_processes(roslaunch_uris=None):
"""
@param roslaunch_uris: (optional) list of XML-RPCS. If none
are provided, will look up URIs dynamically
@type roslaunch_uris: [str]
@return: list of roslaunch processes
@rtype: [L{NetProcess}]
"""
if not roslaunch_uris:
roslaunch_uris = get_roslaunch_uris()
if not roslaunch_uris:
return []
procs = []
for uri in roslaunch_uris:
try:
r = ServerProxy(uri)
code, msg, val = r.list_processes()
if code == 1:
active, dead = val
procs.extend([NetProcess(a[0], a[1], True, uri) for a in active])
procs.extend([NetProcess(d[0], d[1], False, uri) for d in dead])
except:
#import traceback
#traceback.print_exc()
# don't have a mechanism for reporting these errors upwards yet
pass
return procs
开发者ID:Aand1,项目名称:ROSCH,代码行数:28,代码来源:netapi.py
示例11: test_server
def test_server(host='localhost', port=4966):
"""Test for a Larch server on host and port
Arguments
host (str): host name ['localhost']
port (int): port number [4966]
Returns
integer status number:
0 Not in use.
1 Connected, valid Larch server
2 In use, but not a valid Larch server
"""
server = ServerProxy('http://%s:%d' % (host, port))
try:
methods = server.system.listMethods()
except socket.error:
return NOT_IN_USE
# verify that this is a valid larch server
if len(methods) < 5 or 'larch' not in methods:
return NOT_LARCHSERVER
ret = ''
try:
ret = server.get_rawdata('_sys.config.user_larchdir')
except:
return NOT_LARCHSERVER
if len(ret) < 1:
return NOT_LARCHSERVER
return CONNECTED
开发者ID:xraypy,项目名称:xraylarch,代码行数:31,代码来源:xmlrpc_server.py
示例12: get_next_server
def get_next_server(self):
"""
Get the next server online
return rpc server object
"""
#if index is out of list range, scan network again
if self.index_list_online >= len(self.list_online):
self.scan_online_servers()
if len(self.list_online) == 0 :
sys.exit("Couldn't find any server. Verify your connection.")
#get the next server of the list of servers online
self.ip_server = self.list_online[self.index_list_online]
self.index_list_online += 1
#make the structure to connect rpc_server
server_addr = 'http://{}:{}'.format(self.ip_server, self.config.sync_port)
try:
#set configurations of connection
connection = ServerProxy(server_addr)
socket.setdefaulttimeout(10)
#verify if server is online and fine
connection.still_alive()
socket.setdefaulttimeout(None)
#return the object server_rpc
return connection
except:
#try next connection
return self.get_next_server()
开发者ID:gspd,项目名称:FlexA,代码行数:33,代码来源:RPC.py
示例13: test_get_last_N_minutes
def test_get_last_N_minutes(self):
proxy = ServerProxy('http://127.0.0.1:8000/')
r = proxy.get_last_N_minutes('node-021', 'PH_EXT', 10)
logging.debug(r)
self.assertTrue('ReceptionTime' in r)
self.assertTrue('PH_EXT' in r)
self.assertTrue(len(r['ReceptionTime']) == len(r['PH_EXT']))
开发者ID:stanleylio,项目名称:cm1app,代码行数:7,代码来源:test_s1.py
示例14: Client
class Client(Cmd):
intro = 'Welcome to the p2p file share shell.\n\
Type help or ? to list commands.'
prompt = '>'
def __init__(self, url, dirname, urlfile):
Cmd.__init__(self)
self.secret = random_string(SECRET_LENGTH)
n = Node(url, dirname, self.secret)
t = Thread(target=n.serve_forever)
t.setDaemon(1)
t.start()
time.sleep(HEAD_START)
self.server = ServerProxy(url)
for line in open(urlfile):
line = line.strip()
self.server.hello(line)
def do_fetch(self, arg):
try:
self.server.fetch(arg, self.secret)
except Fault as f:
if f.faultCode != UNHANDLED:
raise
print("Couldn't find the file ", arg)
def do_exit(self, arg):
sys.exit()
do_EOF = do_exit
开发者ID:dust8,项目名称:bpfntp,代码行数:30,代码来源:client.py
示例15: Client
class Client(Cmd):
prompt = '> '
def __init__(self, url, dirname, urlfile):
Cmd.__init__(self)
self.secret = randomString(SECRET_LENGTH)
n = Node(url, dirname, self.secret)
t = Thread(target=n._start)
t.setDaemon(1)
t.start()
sleep(HEAD_START)
self.server = ServerProxy(url)
urlfile = path.join(dirname, urlfile)
for line in open(urlfile):
line = line.strip()
self.server.hello(line)
def do_fetch(self, arg):
try:
self.server.fetch(arg, self.secret)
except Fault as f:
if f.faultCode != UNHANDLED: raise
print("Couldn't find the file", arg)
def do_exit(self, arg):
print()
sys.exit()
do_EOF = do_exit
开发者ID:panda0881,项目名称:Beginning-Python,代码行数:30,代码来源:client.py
示例16: on_task_output
def on_task_output(self, task, config):
from xmlrpc.client import ServerProxy
params = dict(config)
server = ServerProxy(params["url"])
for entry in task.accepted:
if task.options.test:
log.info('Would add into nzbget: %s', entry['title'])
continue
# allow overriding the category
if 'category' in entry:
params['category'] = entry['category']
try:
server.appendurl(
entry['title'] + '.nzb',
params['category'],
params['priority'],
params['top'],
entry['url'],
)
log.info('Added `%s` to nzbget', entry['title'])
except Exception as e:
log.critical('rpc call to nzbget failed: %s', e)
entry.fail('could not call appendurl via RPC')
开发者ID:Flexget,项目名称:Flexget,代码行数:28,代码来源:nzbget.py
示例17: PageStorageClient
class PageStorageClient(object):
def __init__(self, server_address=("127.0.0.1", 9966)):
self.server_address = server_address
self.server_proxy = XMLRPCServerProxy(
"http://%s:%s/" % self.server_address,
allow_none=True)
def get_page(self, target_url):
return decode_from_binary(self.server_proxy.get_page(target_url))
def get_page_by_id(self, page_id):
return decode_from_binary(self.server_proxy.get_page_by_id(page_id))
def set_page(self, page_info):
return self.server_proxy.set_page(encode_to_binary(page_info))
def remove_page(self, target):
return self.server_proxy.remove_page(encode_to_binary(target))
def __contains__(self, key):
return self.server_proxy.has_key(key)
def has_key(self, key):
return self.__contains__(key)
开发者ID:sotoshigoto,项目名称:LXMLSearchEngine,代码行数:30,代码来源:PageStorageServer.py
示例18: test_unpublish
def test_unpublish(self):
node_proxy = ServerProxy(rospy.get_node_uri())
_, _, pubs = node_proxy.getPublications('/foo')
pubs = [p for p in pubs if p[0] != '/rosout']
self.assert_(not pubs, pubs)
print("Publishing ", PUBTOPIC)
pub = rospy.Publisher(PUBTOPIC, String)
topic = rospy.resolve_name(PUBTOPIC)
_, _, pubs = node_proxy.getPublications('/foo')
pubs = [p for p in pubs if p[0] != '/rosout']
self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)
# publish about 10 messages for fun
for i in range(0, 10):
pub.publish(String("hi [%s]"%i))
time.sleep(0.1)
# begin actual test by unsubscribing
pub.unregister()
# make sure no new messages are received in the next 2 seconds
timeout_t = time.time() + 2.0
while timeout_t < time.time():
time.sleep(1.0)
self.assert_(_last_callback is None)
# verify that close cleaned up master and node state
_, _, pubs = node_proxy.getPublications('/foo')
pubs = [p for p in pubs if p[0] != '/rosout']
self.assert_(not pubs, "Node still has pubs: %s"%pubs)
n = rospy.get_caller_id()
self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
开发者ID:Aand1,项目名称:ROSCH,代码行数:34,代码来源:test_deregister.py
示例19: check_pypi
def check_pypi(self):
"""
If the requirement is frozen to pypi, check for a new version.
"""
for dist in get_installed_distributions():
name = dist.project_name
if name in self.reqs.keys():
self.reqs[name]["dist"] = dist
pypi = ServerProxy("https://pypi.python.org/pypi")
for name, req in list(self.reqs.items()):
if req["url"]:
continue # skipping github packages.
elif "dist" in req:
dist = req["dist"]
dist_version = LooseVersion(dist.version)
available = pypi.package_releases(req["pip_req"].name, True) or pypi.package_releases(req["pip_req"].name.replace('-', '_'), True)
available_version = self._available_version(dist_version, available)
if not available_version:
msg = self.style.WARN("release is not on pypi (check capitalization and/or --extra-index-url)")
elif self.options['show_newer'] and dist_version > available_version:
msg = self.style.INFO("{0} available (newer installed)".format(available_version))
elif available_version > dist_version:
msg = self.style.INFO("{0} available".format(available_version))
else:
msg = "up to date"
del self.reqs[name]
continue
pkg_info = self.style.BOLD("{dist.project_name} {dist.version}".format(dist=dist))
else:
msg = "not installed"
pkg_info = name
print("{pkg_info:40} {msg}".format(pkg_info=pkg_info, msg=msg))
del self.reqs[name]
开发者ID:marcosptf,项目名称:fedora,代码行数:35,代码来源:pipchecker.py
示例20: test_ROSLaunchNode
def test_ROSLaunchNode(self):
#exercise the basic ROSLaunchNode API
from roslaunch.server import ROSLaunchNode
# - create a node with a handler
handler = TestHandler(self.pmon)
node = ROSLaunchNode(handler)
# - start the node
node.start()
self.assert_(node.uri)
# - call the ping API that we added
s = ServerProxy(node.uri)
test_val = 'test-%s'%time.time()
s.ping(test_val)
self.assertEquals(handler.pinged, test_val)
# - call the pid API
code, msg, pid = s.get_pid()
self.assertEquals(1, code)
self.assert_(type(msg) == str)
self.assertEquals(os.getpid(), pid)
# - shut it down
node.shutdown('test done')
开发者ID:Aand1,项目名称:ROSCH,代码行数:26,代码来源:test_roslaunch_server.py
注:本文中的xmlrpc.client.ServerProxy类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论