• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python server.SimpleXMLRPCServer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中xmlrpc.server.SimpleXMLRPCServer的典型用法代码示例。如果您正苦于以下问题:Python SimpleXMLRPCServer类的具体用法?Python SimpleXMLRPCServer怎么用?Python SimpleXMLRPCServer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SimpleXMLRPCServer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

class KeyValueServer:
    _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
    def __init__(self, address):
        self._data = {}
        self._serv = SimpleXMLRPCServer(address, allow_none=True)
        for name in self._rpc_methods_:
            self._serv.register_function(getattr(self, name))

    def get(self, name):
        return self._data[name]

    def set(self, name, value):
        self._data[name] = value

    def delete(self, name):
        del self._data[name]

    def exists(self, name):
        return name in self._data

    def keys(self):
        return list(self._data)

    def serve_forever(self):
        self._serv.serve_forever()
开发者ID:feng1o,项目名称:python_1,代码行数:25,代码来源:xml-rpc.py


示例2: __init__

class KeyValueServer:
    _rpc_methods = ["get", "set", "exists", "delete", "keys"]

    def __init__(self, address):
        self._data = {}
        self._serv = SimpleXMLRPCServer(address, allow_none=True)
        for method in self._rpc_methods:
            self._serv.register_function(getattr(self, method), method)

    def get(self, name):
        return self._data["name"]

    def set(self, name, value):
        self._data["name"] = value

    def exists(self, name):
        return name in self._data

    def delete(self, name):
        del self._data["name"]

    def keys(self):
        # dict_keys is not supported
        return list(self._data.keys())

    def serve_forever(self):
        self._serv.serve_forever()
开发者ID:PyBeaner,项目名称:PythonCookbook,代码行数:27,代码来源:server.py


示例3: _start

 def _start(self):
     """
     Used internally to start the XML-RPC server.
     """
     s = SimpleXMLRPCServer(("", getPort(self.url)), logRequests=False)
     s.register_instance(self)
     s.serve_forever()
开发者ID:quietcoolwu,项目名称:Beginning_Python,代码行数:7,代码来源:listing27-2.py


示例4: __init__

 def __init__(self, port=8270, port_file=None):
     SimpleXMLRPCServer.__init__(self, ("127.0.0.1", int(port)))
     self.register_function(self.get_keyword_names)
     self.register_function(self.get_keyword_documentation)
     self.register_function(self.run_keyword)
     announce_port(self.socket, port_file)
     self.serve_forever()
开发者ID:userzimmermann,项目名称:robotframework-python3,代码行数:7,代码来源:documentation.py


示例5: __init__

 def __init__(self, addr, log_requests=1):
     """
     Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
     """
     # allow_reuse_address defaults to False in Python 2.4.  We set it 
     # to True to allow quick restart on the same port.  This is equivalent 
     # to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
     self.allow_reuse_address = True
     if rosgraph.network.use_ipv6():
         logger = logging.getLogger('xmlrpc')
         # The XMLRPC library does not support IPv6 out of the box
         # We have to monipulate private members and duplicate
         # code from the constructor.
         # TODO IPV6: Get this into SimpleXMLRPCServer 
         SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests,  bind_and_activate=False)
         self.address_family = socket.AF_INET6
         self.socket = socket.socket(self.address_family, self.socket_type)
         logger.info('binding ipv6 xmlrpc socket to' + str(addr))
         # TODO: set IPV6_V6ONLY to 0:
         # self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
         self.server_bind()
         self.server_activate()
         logger.info('bound to ' + str(self.socket.getsockname()[0:2]))
     else:
         SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:Aand1,项目名称:ROSCH,代码行数:25,代码来源:xmlrpc.py


示例6: __init__

    def __init__(self, dbfile, logfile, interface, daemon=True):
        ''' constructor '''
        try:
            SimpleXMLRPCServer.__init__(self, interface,
                                        logRequests=False, allow_none=True)
        except socket.error:
            ip=socket.gethostbyname(interface[0])
            port=interface[1]
            msg="PR Server unable to bind to %s:%s\n" % (ip, port)
            sys.stderr.write(msg)
            raise PRServiceConfigError

        self.dbfile=dbfile
        self.daemon=daemon
        self.logfile=logfile
        self.working_thread=None
        self.host, self.port = self.socket.getsockname()
        self.pidfile=PIDPREFIX % (self.host, self.port)

        self.register_function(self.getPR, "getPR")
        self.register_function(self.quit, "quit")
        self.register_function(self.ping, "ping")
        self.register_function(self.export, "export")
        self.register_function(self.dump_db, "dump_db")
        self.register_function(self.importone, "importone")
        self.register_introspection_functions()

        self.requestqueue = queue.Queue()
        self.handlerthread = threading.Thread(target = self.process_request_thread)
        self.handlerthread.daemon = False
开发者ID:drewmoseley,项目名称:poky,代码行数:30,代码来源:serv.py


示例7: __init__

class Server:

    def __init__(self, address):
        self._rpc_methods_ = ['get', 'put', 'put_back', 'tick', 'keys']
        self._data = {}
        self._serv = SimpleXMLRPCServer(address, allow_none=True)
        for name in self._rpc_methods_:
            self._serv.register_function(getattr(self, name))

    def get(self, key):
        return self._data[key]

    def put(self, key, value):
        self._data[key] = value

    def put_back(self, key, value):
        pass

    def tick(self):
        pass

    def keys(self):
        return list(self._data)

    def serve_forever(self):
        self._serv.serve_forever()
开发者ID:vitaly-emelianov,项目名称:distribsys2015,代码行数:26,代码来源:server.py


示例8: runServers

def runServers(xmlrpc=False,tcpPy=False):
    """Run python telnet server and info socket. They will be run at localhost on ports 9000 (or higher if used) and 21000 (or higer if used) respectively.
    
    The python telnet server accepts only connection from localhost,
    after authentication by random cookie, which is printed on stdout
    at server startup.

    The info socket provides read-only access to several simulation parameters
    at runtime. Each connection receives pickled dictionary with those values.
    This socket is primarily used by woo-multi batch scheduler.
    """
    if tcpPy:
        import woo.runtime
        srv=GenericTCPServer(handler=woo.remote.PythonConsoleSocketEmulator,title='TCP python prompt',cookie=True,minPort=9000)
        woo.runtime.cookie=srv.server.cookie
    if xmlrpc:
        if future.utils.PY3: from xmlrpc.server import SimpleXMLRPCServer
        else: from SimpleXMLRPCServer import SimpleXMLRPCServer
        port,maxPort=21000,65535 # minimum port number
        while port<maxPort:
            try:
                info=SimpleXMLRPCServer(('',port),logRequests=False,allow_none=True); break
            except socket.error: port+=1
        if port==maxPort: raise RuntimeError("No free port to listen on in range 21000-%d"%maxPort)
        # register methods, as per http://docs.python.org/library/simplexmlrpcserver.html#simplexmlrpcserver-example
        info.register_instance(InfoProvider()) # gets all defined methods by introspection
        #prov=InfoProvider()
        #for m in prov.exposedMethods(): info.register_function(m)
        _runInBackground(info.serve_forever)
        print('XMLRPC info provider on http://localhost:%d'%port)
    sys.stdout.flush()
开发者ID:woodem,项目名称:woo,代码行数:31,代码来源:remote.py


示例9: __init__

 def __init__(self, addr, log_requests=1):
     """
     Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
     """
     # allow_reuse_address defaults to False in Python 2.4.  We set it 
     # to True to allow quick restart on the same port.  This is equivalent 
     # to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
     self.allow_reuse_address = True
     if rosgraph.network.use_ipv6():
         logger = logging.getLogger('xmlrpc')
         # The XMLRPC library does not support IPv6 out of the box
         # We have to monipulate private members and duplicate
         # code from the constructor.
         # TODO IPV6: Get this into SimpleXMLRPCServer 
         SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests,  bind_and_activate=False)
         self.address_family = socket.AF_INET6
         self.socket = socket.socket(self.address_family, self.socket_type)
         logger.info('binding ipv6 xmlrpc socket to' + str(addr))
         # TODO: set IPV6_V6ONLY to 0:
         # self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
         self.server_bind()
         self.server_activate()
         logger.info('bound to ' + str(self.socket.getsockname()[0:2]))
         # TODO IPV6: check Windows compatibility
         # [Bug #1222790] If possible, set close-on-exec flag; if a
         # method spawns a subprocess, the subprocess shouldn't have
         # the listening socket open.
         if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
             flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
             flags |= fcntl.FD_CLOEXEC
             fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
     else:
         SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:schneider42,项目名称:ros_comm,代码行数:33,代码来源:xmlrpc.py


示例10: __init__

 def __init__(self, library, port=8270, port_file=None):
     SimpleXMLRPCServer.__init__(self, ('127.0.0.1', int(port)))
     self.library = library
     self._shutdown = False
     self._register_functions()
     announce_port(self.socket, port_file)
     self.serve_forever()
开发者ID:HelioGuilherme66,项目名称:robotframework,代码行数:7,代码来源:remoteserver.py


示例11: run_xmlrpc_server

def run_xmlrpc_server():
  port = 1210
  oo_proxy = OOProxy()
  #server = SimpleXMLRPCServer.SimpleXMLRPCServer(("localhost", port))
  server = SimpleXMLRPCServer(("localhost", port))
  server.register_instance(oo_proxy)
  #Go into the main listener loop
  server.serve_forever()
开发者ID:martinpovolny,项目名称:ujc-oo-server,代码行数:8,代码来源:ooffice_xmlrpc_server.py


示例12: test_exposeFunction2

    def test_exposeFunction2(self):
        """Expose a function using a different name via XML-RPC."""
        server = SimpleXMLRPCServer((HOST, PORT + 2))
        server.register_function(multiply, "mult")
        ServerThread(server).start()

        # Access the exposed service.
        client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 2))
        self.assertEqual(client.mult(7, 11), 77)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py


示例13: test_exposeFunction1

    def test_exposeFunction1(self):
        """Expose a function via XML-RPC."""
        server = SimpleXMLRPCServer((HOST, PORT + 1))
        server.register_function(multiply)
        ServerThread(server).start()

        # Access the exposed service.
        client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 1))
        self.assertEqual(client.multiply(5, 10), 50)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py


示例14: buildServer

def buildServer():
    """build an XMLRPC Server and serve forever \
    waiting for problem to submit.
    """
    global config
    server = SimpleXMLRPCServer((config.get("XMLRPCServer","host"), \
		    config.getint("XMLRPCServer","port")))
    server.register_function(receive, "send")
    server.serve_forever()
开发者ID:leafduo,项目名称:syoj,代码行数:9,代码来源:judged.py


示例15: main

def main():
    server = SimpleXMLRPCServer(("localhost", PORT))
    print("We've got a connection and are listening on port {}...huzzah".format(PORT))

    # 注册函数,这样它可以被即将创建的客户端代码使用
    server.register_function(square, "square")

    # 启动服务器
    server.serve_forever()
开发者ID:L1nwatch,项目名称:Mac-Python-3.X,代码行数:9,代码来源:test_xmlrpc_server.py


示例16: test_exposeClass

    def test_exposeClass(self):
        """Expose an entire class and test the _dispatch method."""
        server = SimpleXMLRPCServer((HOST, PORT + 3))
        server.register_instance(MyService())
        ServerThread(server).start()

        # Access the exposed service.
        client = xmlrpc.client.ServerProxy("http://%s:%d" % (HOST, PORT + 3))
        self.assertEqual(client.squared(10), 100)
开发者ID:isaiah,项目名称:jython3,代码行数:9,代码来源:test_SimpleXMLRPCServer.py


示例17: __init__

 def __init__(self, addr, log_requests=1):
     """
     Overrides SimpleXMLRPCServer to set option to allow_reuse_address.
     """
     # allow_reuse_address defaults to False in Python 2.4.  We set it 
     # to True to allow quick restart on the same port.  This is equivalent 
     # to calling setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
     self.allow_reuse_address = True
     SimpleXMLRPCServer.__init__(self, addr, SilenceableXMLRPCRequestHandler, log_requests)
开发者ID:bhaskara,项目名称:rosh_fuerte,代码行数:9,代码来源:xmlrpc.py


示例18: __init__

 def __init__(self, library, port=8270, port_file=None):
     SimpleXMLRPCServer.__init__(self, ('127.0.0.1', int(port)))
     self.library = library
     self._shutdown = False
     self.register_function(self.get_keyword_names)
     self.register_function(self.get_keyword_arguments)
     self.register_function(self.run_keyword)
     announce_port(self.socket, port_file)
     self.serve_forever()
开发者ID:userzimmermann,项目名称:robotframework-python3,代码行数:9,代码来源:remoteserver.py


示例19: DebugStubComm

class DebugStubComm(object):

    def __init__(self, stub, port):
        self.stub = stub
        addr = "0.0.0.0"
        self.server = SimpleXMLRPCServer((addr, port))
        self.server.register_function(self.handshake)
        self.server.register_function(self.get_agent_list)
        self.server.register_function(self.track_agent)
        self.server.register_function(self.get_agent_track)


    def start_service(self):
        self.server.serve_forever()

    ###########################################################################
    # client side api
    ###########################################################################
    def get_agent_list(self):
        self.stub.lock.acquire()
        l = [i for i in self.stub.agents]
        self.stub.lock.release()
        return l

    def handshake(self):
        return True

    def track_agent(self, agent_id, on_off):
        self.stub.lock.acquire()
        if agent_id not in self.stub.agents:
            self.stub.lock.release()
            return False
        agent = self.stub.agents[agent_id]
        agent.tracked = on_off
        self.stub.lock.release()
        return True

    def get_agent_track(self, agent_id):
        self.stub.lock.acquire()
        if agent_id not in self.stub.agents:
            self.stub.lock.release()
            return False
        agent = self.stub.agents[agent_id]
        ret = agent.history
        agent.history = []
        self.stub.lock.release()
        return ret

    def set_breakpoint(self, agent_id, location_info):
        pass

    def remove_breakpoint(self, agent_id, location_info):
        pass

    def continue_agent(self, agent_id):
        pass
开发者ID:spacejump163,项目名称:ai2,代码行数:56,代码来源:debug_stub.py


示例20: start

def start():
    #NOTE: bots directory should always be on PYTHONPATH - otherwise it will not start.
    #***command line arguments**************************
    usage = '''
    This is "%(name)s" version %(version)s, part of Bots open source edi translator (http://bots.sourceforge.net).
    Server program that ensures only a single bots-engine runs at any time, and no engine run requests are
    lost/discarded. Each request goes to a queue and is run in sequence when the previous run completes.
    Use of the job queue is optional and must be configured in bots.ini (jobqueue section, enabled = True).
    Usage:
        %(name)s  -c<directory>
    Options:
        -c<directory>   directory for configuration files (default: config).

    ''' % {'name': os.path.basename(sys.argv[0]), 'version': 3.3}
    configdir = 'config'
    for arg in sys.argv[1:]:
        if arg.startswith('-c'):
            configdir = arg[2:]
            if not configdir:
                print('Error: configuration directory indicated, but no directory name.')
                sys.exit(1)
        else:
            print(usage)
            sys.exit(0)
    #***end handling command line arguments**************************
    #~ botsinit.generalinit(configdir)     #find locating of bots, configfiles, init paths etc.
    #~ if not botsglobal.ini.getboolean('jobqueue','enabled',False):
        #~ print('Error: bots jobqueue cannot start; not enabled in %s/bots.ini'%(configdir))
        #~ sys.exit(1)
    nr_threads = 2  # botsglobal.ini.getint('jobqueue','nr_threads')
    process_name = 'jobqueue'
    #~ logger = botsinit.initserverlogging(process_name)
    #~ logger.log(25,'Bots %(process_name)s started.',{'process_name':process_name})
    #~ logger.log(25,'Bots %(process_name)s configdir: "%(configdir)s".',{'process_name':process_name,'configdir':botsglobal.ini.get('directories','config')})
    port = 28082  # botsglobal.ini.getint('jobqueue','port',28082)
    #~ logger.log(25,'Bots %(process_name)s listens for xmlrpc at port: "%(port)s".',{'process_name':process_name,'port':port})

    #start launcher thread
    lauchfrequency = 5  # botsglobal.ini.getint('jobqueue','lauchfrequency',5)
    maxruntime = 60  # botsglobal.ini.getint('settings','maxruntime',60)
    for thread in range(nr_threads)
        launcher_thread = threading.Thread(name='launcher', target=launcher,
                                           args=(logger, queue, lauchfrequency, maxruntime))
        launcher_thread.start()
        #~ logger.info('Jobqueue launcher started.')

    #the main thread is the xmlrpc server: all adding, getting etc for jobqueue is done via xmlrpc.
    #~ logger.info('Jobqueue server started.')
    server = SimpleXMLRPCServer(('localhost', port), logRequests=False)
    server.register_instance(Jobqueue(logger))
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass

    sys.exit(0)
开发者ID:WouterVH,项目名称:bots,代码行数:56,代码来源:jobqueueserver.py



注:本文中的xmlrpc.server.SimpleXMLRPCServer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python xmlrpc_test.assert_attr_equal函数代码示例发布时间:2022-05-26
下一篇:
Python client.ServerProxy类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap