本文整理汇总了Python中xmlrpc.server.SimpleXMLRPCServer类的典型用法代码示例。如果您正苦于以下问题:Python SimpleXMLRPCServer类的具体用法?Python SimpleXMLRPCServer怎么用?Python SimpleXMLRPCServer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SimpleXMLRPCServer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
开发者ID:feng1o,项目名称:python_1,代码行数:25,代码来源:xml-rpc.py
示例2: __init__
class KeyValueServer:
_rpc_methods = ["get", "set", "exists", "delete", "keys"]
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for method in self._rpc_methods:
self._serv.register_function(getattr(self, method), method)
def get(self, name):
return self._data["name"]
def set(self, name, value):
self._data["name"] = value
def exists(self, name):
return name in self._data
def delete(self, name):
del self._data["name"]
def keys(self):
# dict_keys is not supported
return list(self._data.keys())
def serve_forever(self):
self._serv.serve_forever()
开发者ID:PyBeaner,项目名称:PythonCookbook,代码行数:27,代码来源:server.py
示例3: _start
def _start(self):
"""
Used internally to start the XML-RPC server.
"""
s = SimpleXMLRPCServer(("", getPort(self.url)), logRequests=False)
s.register_instance(self)
s.serve_forever()
开发者ID:quietcoolwu,项目名称:Beginning_Python,代码行数:7,代码来源:listing27-2.py
示例4: __init__
def __init__(self, port=8270, port_file=None):
SimpleXMLRPCServer.__init__(self, ("127.0.0.1", int(port)))
self.register_function(self.get_keyword_names)
self.register_function(self.get_keyword_documentation)
self.register_function(self.run_keyword)
announce_port(self.socket, port_file)
self.serve_forever()
开发者ID:userzimmermann,项目名称:robotframework-python3,代码行数:7,代码来源:documentation.py
示例5: __init__
def __init__(self, addr, log_requests=1):
"""
Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
"""
# allow_reuse_address defaults to False in Python 2.4. We set it
# to True to allow quick restart on the same port. This is equivalent
# to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
self.allow_reuse_address = True
if rosgraph.network.use_ipv6():
logger = logging.getLogger('xmlrpc')
# The XMLRPC library does not support IPv6 out of the box
# We have to monipulate private members and duplicate
# code from the constructor.
# TODO IPV6: Get this into SimpleXMLRPCServer
SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests, bind_and_activate=False)
self.address_family = socket.AF_INET6
self.socket = socket.socket(self.address_family, self.socket_type)
logger.info('binding ipv6 xmlrpc socket to' + str(addr))
# TODO: set IPV6_V6ONLY to 0:
# self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
self.server_bind()
self.server_activate()
logger.info('bound to ' + str(self.socket.getsockname()[0:2]))
else:
SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:Aand1,项目名称:ROSCH,代码行数:25,代码来源:xmlrpc.py
示例6: __init__
def __init__(self, dbfile, logfile, interface, daemon=True):
''' constructor '''
try:
SimpleXMLRPCServer.__init__(self, interface,
logRequests=False, allow_none=True)
except socket.error:
ip=socket.gethostbyname(interface[0])
port=interface[1]
msg="PR Server unable to bind to %s:%s\n" % (ip, port)
sys.stderr.write(msg)
raise PRServiceConfigError
self.dbfile=dbfile
self.daemon=daemon
self.logfile=logfile
self.working_thread=None
self.host, self.port = self.socket.getsockname()
self.pidfile=PIDPREFIX % (self.host, self.port)
self.register_function(self.getPR, "getPR")
self.register_function(self.quit, "quit")
self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
self.register_function(self.dump_db, "dump_db")
self.register_function(self.importone, "importone")
self.register_introspection_functions()
self.requestqueue = queue.Queue()
self.handlerthread = threading.Thread(target = self.process_request_thread)
self.handlerthread.daemon = False
开发者ID:drewmoseley,项目名称:poky,代码行数:30,代码来源:serv.py
示例7: __init__
class Server:
def __init__(self, address):
self._rpc_methods_ = ['get', 'put', 'put_back', 'tick', 'keys']
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, key):
return self._data[key]
def put(self, key, value):
self._data[key] = value
def put_back(self, key, value):
pass
def tick(self):
pass
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
开发者ID:vitaly-emelianov,项目名称:distribsys2015,代码行数:26,代码来源:server.py
示例8: runServers
def runServers(xmlrpc=False,tcpPy=False):
"""Run python telnet server and info socket. They will be run at localhost on ports 9000 (or higher if used) and 21000 (or higer if used) respectively.
The python telnet server accepts only connection from localhost,
after authentication by random cookie, which is printed on stdout
at server startup.
The info socket provides read-only access to several simulation parameters
at runtime. Each connection receives pickled dictionary with those values.
This socket is primarily used by woo-multi batch scheduler.
"""
if tcpPy:
import woo.runtime
srv=GenericTCPServer(handler=woo.remote.PythonConsoleSocketEmulator,title='TCP python prompt',cookie=True,minPort=9000)
woo.runtime.cookie=srv.server.cookie
if xmlrpc:
if future.utils.PY3: from xmlrpc.server import SimpleXMLRPCServer
else: from SimpleXMLRPCServer import SimpleXMLRPCServer
port,maxPort=21000,65535 # minimum port number
while port<maxPort:
try:
info=SimpleXMLRPCServer(('',port),logRequests=False,allow_none=True); break
except socket.error: port+=1
if port==maxPort: raise RuntimeError("No free port to listen on in range 21000-%d"%maxPort)
# register methods, as per http://docs.python.org/library/simplexmlrpcserver.html#simplexmlrpcserver-example
info.register_instance(InfoProvider()) # gets all defined methods by introspection
#prov=InfoProvider()
#for m in prov.exposedMethods(): info.register_function(m)
_runInBackground(info.serve_forever)
print('XMLRPC info provider on http://localhost:%d'%port)
sys.stdout.flush()
开发者ID:woodem,项目名称:woo,代码行数:31,代码来源:remote.py
示例9: __init__
def __init__(self, addr, log_requests=1):
"""
Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
"""
# allow_reuse_address defaults to False in Python 2.4. We set it
# to True to allow quick restart on the same port. This is equivalent
# to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
self.allow_reuse_address = True
if rosgraph.network.use_ipv6():
logger = logging.getLogger('xmlrpc')
# The XMLRPC library does not support IPv6 out of the box
# We have to monipulate private members and duplicate
# code from the constructor.
# TODO IPV6: Get this into SimpleXMLRPCServer
SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests, bind_and_activate=False)
self.address_family = socket.AF_INET6
self.socket = socket.socket(self.address_family, self.socket_type)
logger.info('binding ipv6 xmlrpc socket to' + str(addr))
# TODO: set IPV6_V6ONLY to 0:
# self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
self.server_bind()
self.server_activate()
logger.info('bound to ' + str(self.socket.getsockname()[0:2]))
# TODO IPV6: check Windows compatibility
# [Bug #1222790] If possible, set close-on-exec flag; if a
# method spawns a subprocess, the subprocess shouldn't have
# the listening socket open.
if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
else:
SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:schneider42,项目名称:ros_comm,代码行数:33,代码来源:xmlrpc.py
示例10: __init__
def __init__(self, library, port=8270, port_file=None):
SimpleXMLRPCServer.__init__(self, ('127.0.0.1', int(port)))
self.library = library
self._shutdown = False
self._register_functions()
announce_port(self.socket, port_file)
self.serve_forever()
开发者ID:HelioGuilherme66,项目名称:robotframework,代码行数:7,代码来源:remoteserver.py
示例11: run_xmlrpc_server
def run_xmlrpc_server():
port = 1210
oo_proxy = OOProxy()
#server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", port))
server = SimpleXMLRPCServer(("localhost", port))
server.register_instance(oo_proxy)
#Go into the main listener loop
server.serve_forever()
开发者ID:martinpovolny,项目名称:ujc-oo-server,代码行数:8,代码来源:ooffice_xmlrpc_server.py
示例12: test_exposeFunction2
def test_exposeFunction2(self):
"""Expose a function using a different name via XML-RPC."""
server = SimpleXMLRPCServer((HOST, PORT + 2))
server.register_function(multiply, "mult")
ServerThread(server).start()
# Access the exposed service.
client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 2))
self.assertEqual(client.mult(7, 11), 77)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py
示例13: test_exposeFunction1
def test_exposeFunction1(self):
"""Expose a function via XML-RPC."""
server = SimpleXMLRPCServer((HOST, PORT + 1))
server.register_function(multiply)
ServerThread(server).start()
# Access the exposed service.
client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 1))
self.assertEqual(client.multiply(5, 10), 50)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py
示例14: buildServer
def buildServer():
"""build an XMLRPC Server and serve forever \
waiting for problem to submit.
"""
global config
server = SimpleXMLRPCServer((config.get("XMLRPCServer","host"), \
config.getint("XMLRPCServer","port")))
server.register_function(receive, "send")
server.serve_forever()
开发者ID:leafduo,项目名称:syoj,代码行数:9,代码来源:judged.py
示例15: main
def main():
server = SimpleXMLRPCServer(("localhost", PORT))
print("We've got a connection and are listening on port {}...huzzah".format(PORT))
# 注册函数,这样它可以被即将创建的客户端代码使用
server.register_function(square, "square")
# 启动服务器
server.serve_forever()
开发者ID:L1nwatch,项目名称:Mac-Python-3.X,代码行数:9,代码来源:test_xmlrpc_server.py
示例16: test_exposeClass
def test_exposeClass(self):
"""Expose an entire class and test the _dispatch method."""
server = SimpleXMLRPCServer((HOST, PORT + 3))
server.register_instance(MyService())
ServerThread(server).start()
# Access the exposed service.
client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 3))
self.assertEqual(client.squared(10), 100)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py
示例17: __init__
def __init__(self, addr, log_requests=1):
"""
Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
"""
# allow_reuse_address defaults to False in Python 2.4. We set it
# to True to allow quick restart on the same port. This is equivalent
# to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
self.allow_reuse_address = True
SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:bhaskara,项目名称:rosh_fuerte,代码行数:9,代码来源:xmlrpc.py
示例18: __init__
def __init__(self, library, port=8270, port_file=None):
SimpleXMLRPCServer.__init__(self, ('127.0.0.1', int(port)))
self.library = library
self._shutdown = False
self.register_function(self.get_keyword_names)
self.register_function(self.get_keyword_arguments)
self.register_function(self.run_keyword)
announce_port(self.socket, port_file)
self.serve_forever()
开发者ID:userzimmermann,项目名称:robotframework-python3,代码行数:9,代码来源:remoteserver.py
示例19: DebugStubComm
class DebugStubComm(object):
def __init__(self, stub, port):
self.stub = stub
addr = "0.0.0.0"
self.server = SimpleXMLRPCServer((addr, port))
self.server.register_function(self.handshake)
self.server.register_function(self.get_agent_list)
self.server.register_function(self.track_agent)
self.server.register_function(self.get_agent_track)
def start_service(self):
self.server.serve_forever()
###########################################################################
# client side api
###########################################################################
def get_agent_list(self):
self.stub.lock.acquire()
l = [i for i in self.stub.agents]
self.stub.lock.release()
return l
def handshake(self):
return True
def track_agent(self, agent_id, on_off):
self.stub.lock.acquire()
if agent_id not in self.stub.agents:
self.stub.lock.release()
return False
agent = self.stub.agents[agent_id]
agent.tracked = on_off
self.stub.lock.release()
return True
def get_agent_track(self, agent_id):
self.stub.lock.acquire()
if agent_id not in self.stub.agents:
self.stub.lock.release()
return False
agent = self.stub.agents[agent_id]
ret = agent.history
agent.history = []
self.stub.lock.release()
return ret
def set_breakpoint(self, agent_id, location_info):
pass
def remove_breakpoint(self, agent_id, location_info):
pass
def continue_agent(self, agent_id):
pass
开发者ID:spacejump163,项目名称:ai2,代码行数:56,代码来源:debug_stub.py
示例20: start
def start():
#NOTE: bots directory should always be on PYTHONPATH - otherwise it will not start.
#***command line arguments**************************
usage = '''
This is "%(name)s" version %(version)s, part of Bots open source edi translator (http://bots.sourceforge.net).
Server program that ensures only a single bots-engine runs at any time, and no engine run requests are
lost/discarded. Each request goes to a queue and is run in sequence when the previous run completes.
Use of the job queue is optional and must be configured in bots.ini (jobqueue section, enabled = True).
Usage:
%(name)s -c<directory>
Options:
-c<directory> directory for configuration files (default: config).
''' % {'name': os.path.basename(sys.argv[0]), 'version': 3.3}
configdir = 'config'
for arg in sys.argv[1:]:
if arg.startswith('-c'):
configdir = arg[2:]
if not configdir:
print('Error: configuration directory indicated, but no directory name.')
sys.exit(1)
else:
print(usage)
sys.exit(0)
#***end handling command line arguments**************************
#~ botsinit.generalinit(configdir) #find locating of bots, configfiles, init paths etc.
#~ if not botsglobal.ini.getboolean('jobqueue','enabled',False):
#~ print('Error: bots jobqueue cannot start; not enabled in %s/bots.ini'%(configdir))
#~ sys.exit(1)
nr_threads = 2 # botsglobal.ini.getint('jobqueue','nr_threads')
process_name = 'jobqueue'
#~ logger = botsinit.initserverlogging(process_name)
#~ logger.log(25,'Bots %(process_name)s started.',{'process_name':process_name})
#~ logger.log(25,'Bots %(process_name)s configdir: "%(configdir)s".',{'process_name':process_name,'configdir':botsglobal.ini.get('directories','config')})
port = 28082 # botsglobal.ini.getint('jobqueue','port',28082)
#~ logger.log(25,'Bots %(process_name)s listens for xmlrpc at port: "%(port)s".',{'process_name':process_name,'port':port})
#start launcher thread
lauchfrequency = 5 # botsglobal.ini.getint('jobqueue','lauchfrequency',5)
maxruntime = 60 # botsglobal.ini.getint('settings','maxruntime',60)
for thread in range(nr_threads)
launcher_thread = threading.Thread(name='launcher', target=launcher,
args=(logger, queue, lauchfrequency, maxruntime))
launcher_thread.start()
#~ logger.info('Jobqueue launcher started.')
#the main thread is the xmlrpc server: all adding, getting etc for jobqueue is done via xmlrpc.
#~ logger.info('Jobqueue server started.')
server = SimpleXMLRPCServer(('localhost', port), logRequests=False)
server.register_instance(Jobqueue(logger))
try:
server.serve_forever()
except KeyboardInterrupt:
pass
sys.exit(0)
开发者ID:WouterVH,项目名称:bots,代码行数:56,代码来源:jobqueueserver.py
注:本文中的xmlrpc.server.SimpleXMLRPCServer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论