• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python client.ServerProxy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中xmlrpc.client.ServerProxy的典型用法代码示例。如果您正苦于以下问题:Python ServerProxy类的具体用法?Python ServerProxy怎么用?Python ServerProxy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ServerProxy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: MySdkTests

class MySdkTests(unittest.TestCase):
    def setUp(self):
        self.sdk = ServerProxy(RPC_SERVER_URI).sdk

    def tearDown(self):
        pass

    def test_add(self, a=123, b=456):
        result = self.sdk.add(a, b)
        print("\nsdk.add(%d, %d) = %d" % (a, b, result))
        self.assertEqual(result, 579)

    def test_subtract(self, a=2015, b=1999):
        result = self.sdk.subtract(a, b)
        print("\nsdk.subtract(%d, %d) = %d" % (a, b, result))
        self.assertEqual(result, 16)

    def test_httpGet(self, ip="203.208.48.146"):
        result = self.sdk.httpGet(ip)
        print("\nsdk.httpGet(%s) is %s" % (ip, result))
        self.assertIsNotNone(result)

    def test_getDeviceInfo(self):
        result = self.sdk.getDeviceInfo()
        print("\nsdk.getDeviceInfo() is:\n%s" % (result))
        self.assertIsNotNone(result)
开发者ID:oscarli,项目名称:xml-rpc-demo,代码行数:26,代码来源:test_mysdk.py


示例2: NZBConnection

def NZBConnection(url):
    """Method to connect to NZBget client.

    :param url: nzb url to connect
    :return: True if connected, else False
    """
    nzbGetRPC = ServerProxy(url)
    try:
        if nzbGetRPC.writelog('INFO', 'Medusa connected to test connection.'):
            log.debug('Successfully connected to NZBget')
        else:
            log.warning('Successfully connected to NZBget but unable to'
                        ' send a message')
        return True

    except ProtocolError as error:
        if error.errmsg == 'Unauthorized':
            log.warning('NZBget username or password is incorrect.')
        else:
            log.error('Protocol Error: {msg}', {'msg': error.errmsg})
        return False

    except Error as error:
        log.warning('Please check your NZBget host and port (if it is running).'
                    ' NZBget is not responding to this combination.'
                    ' Error: {msg}', {'msg': error.errmsg})
        return False

    except socket.error as error:
        log.warning('Please check your NZBget host and port (if it is running).'
                    ' NZBget is not responding to this combination.'
                    ' Socket Error: {msg}', {'msg': error})
        return False
开发者ID:pymedusa,项目名称:SickRage,代码行数:33,代码来源:nzbget.py


示例3: _do_update

    def _do_update(self, hwids):
        """Query remote server for driver updates for a set of HardwareIDs."""

        logging.debug("Querying XML-RPC driver database %s...", self.url)
        client = ServerProxy(self.url)
        # TODO: error handling; pass kernel_version, architecture
        (res_proto_ver, res_proto_subver, drivers) = client.query(
            "20080407",
            "0",
            {
                "os_name": OSLib.inst.os_vendor,
                "os_version": OSLib.inst.os_version,
                "system_vendor": self.sys_vendor,
                "system_product": self.sys_product,
                "components": ["%s:%s" % (h.type, h.id) for h in hwids],
            },
        )
        logging.debug("  -> protocol: %s/%s, drivers: %s", res_proto_ver, res_proto_subver, str(drivers))

        self.cache = {}
        if res_proto_ver != "20080407":
            logging.warning("   unknown protocol version, not updating")
            return

        for hwid in hwids:
            for drv in drivers.get("%s:%s" % (hwid.type, hwid.id), []):
                if "driver_type" not in drv:
                    continue
                self.cache.setdefault(hwid, []).append(DriverID(**drv))
开发者ID:Fabioamd87,项目名称:nfs-manager,代码行数:29,代码来源:detection.py


示例4: __sort_inputs_by_humanoid

    def __sort_inputs_by_humanoid(self, possible_events):
        if sys.version.startswith("3"):
            from xmlrpc.client import ServerProxy
        else:
            from xmlrpclib import ServerProxy
        proxy = ServerProxy("http://%s/" % self.device.humanoid)
        request_json = {
            "history_view_trees": self.humanoid_view_trees,
            "history_events": [x.__dict__ for x in self.humanoid_events],
            "possible_events": [x.__dict__ for x in possible_events],
            "screen_res": [self.device.display_info["width"],
                           self.device.display_info["height"]]
        }
        result = json.loads(proxy.predict(json.dumps(request_json)))
        new_idx = result["indices"]
        text = result["text"]
        new_events = []

        # get rid of infinite recursive by randomizing first event
        if not self.utg.is_state_reached(self.current_state):
            new_first = random.randint(0, len(new_idx) - 1)
            new_idx[0], new_idx[new_first] = new_idx[new_first], new_idx[0]

        for idx in new_idx:
            if isinstance(possible_events[idx], SetTextEvent):
                possible_events[idx].text = text
            new_events.append(possible_events[idx])
        return new_events
开发者ID:chubbymaggie,项目名称:droidbot,代码行数:28,代码来源:input_policy.py


示例5: _register_with_server

    def _register_with_server(self):
        """
        Register child node with server
        """        
        name = self.name
        self.logger.info("attempting to register with roslaunch parent [%s]"%self.server_uri)
        try:
            server = ServerProxy(self.server_uri)
            code, msg, _ = server.register(name, self.uri)
            if code != 1:
                raise RLException("unable to register with roslaunch server: %s"%msg)
        except Exception as e:
            self.logger.error("Exception while registering with roslaunch parent [%s]: %s"%(self.server_uri, traceback.format_exc()))
            # fail
            raise RLException("Exception while registering with roslaunch parent [%s]: %s"%(self.server_uri, traceback.format_exc()))
        
        self.logger.debug("child registered with server")
        
        # register printlog handler so messages are funneled to remote
        def serverlog(msg):
            server.log(name, Log.INFO, msg)
        def servererrlog(msg):
            server.log(name, Log.ERROR, msg)
        add_printlog_handler(serverlog)
        add_printerrlog_handler(servererrlog)            

        # register process listener to forward process death events to main server
        self.pm.add_process_listener(_ProcessListenerForwarder(server))
开发者ID:Aand1,项目名称:ROSCH,代码行数:28,代码来源:server.py


示例6: _do_update

    def _do_update(self, hwids):
        '''Query remote server for driver updates for a set of HardwareIDs.'''

        logging.debug('Querying XML-RPC driver database %s...', self.url)
        client = ServerProxy(self.url)
        # TODO: error handling; pass kernel_version, architecture
        (res_proto_ver, res_proto_subver, drivers) = client.query(
            '20080407', '0', { 
            'os_name': OSLib.inst.os_vendor,
            'os_version': OSLib.inst.os_version, 
            'system_vendor': self.sys_vendor,
            'system_product': self.sys_product,
            'components': ['%s:%s' % (h.type, h.id) for h in hwids]
            })
        logging.debug('  -> protocol: %s/%s, drivers: %s', res_proto_ver,
            res_proto_subver, str(drivers))

        self.cache = {}
        if res_proto_ver != '20080407':
            logging.warning('   unknown protocol version, not updating')
            return

        for hwid in hwids:
            for drv in drivers.get('%s:%s' % (hwid.type, hwid.id), []):
                if 'driver_type' not in drv:
                    continue
                self.cache.setdefault(hwid, []).append(DriverID(**drv))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:27,代码来源:detection.py


示例7: kill_nodes

def kill_nodes(node_names):
    """
    Call shutdown on the specified nodes

    @return: list of nodes that shutdown was called on successfully and list of failures
    @rtype: ([str], [str])
    """
    master = rosgraph.Master(ID)
    
    success = []
    fail = []
    tocall = []
    try:
        # lookup all nodes keeping track of lookup failures for return value
        for n in node_names:
            try:
                uri = master.lookupNode(n)
                tocall.append([n, uri])
            except:
                fail.append(n)
    except socket.error:
        raise ROSNodeIOException("Unable to communicate with master!")

    for n, uri in tocall:
        # the shutdown call can sometimes fail to succeed if the node
        # tears down during the request handling, so we assume success
        try:
            p = ServerProxy(uri)
            _succeed(p.shutdown(ID, 'user request'))
        except:
            pass
        success.append(n)            

    return success, fail
开发者ID:Nishida-Lab,项目名称:RCR,代码行数:34,代码来源:__init__.py


示例8: replicate

    def replicate(self, item):
        if item is None:
            return

        path = item[0]
        cs_list = item[1]

        alive = [x for x in cs_list if self.ns._is_alive_cs(x)]

        if len(alive) == 0:
            print('There no live cs for chunk', path)
            return

        if len(alive) >= 2:
            print('File', path, 'is already replicated to more than 2 nodes')
            return

        new_cs = self.ns._select_available_cs(alive)
        if new_cs is None:
            print("Can't find available cs for replication", path)
        else:
            try:
                cl = ServerProxy(alive[0])
                cl.replicate_chunk(path, new_cs)
                print("File", path, "replicated to", new_cs)

                file = self.ns.root.find_file_by_chunk(path)
                if file is not None and path in file.chunks:
                    file.chunks[path].append(new_cs)
                else:
                    print("Cant find file for chunk", path, "after replication")
            except Exception as e:
                print('Error during replicatiion', path, 'to', new_cs, ':', e)
开发者ID:osteotek,项目名称:yadfs,代码行数:33,代码来源:replicator.py


示例9: register

    def register(self):
        """Tries to register with the Meta-Server"""

        foamLogger().info("Trying to register as IP:%s PID:%d Port:%d"
                          % (self._answerer.ip(),
                             self._answerer.pid(),self._port))
        try:
            try:
                meta=ServerProxy(
                    "http://%s:%d" % (config().get(
                        "Metaserver","ip"),config().getint("Metaserver","port")))
                response=meta.registerServer(self._answerer.ip(),
                                             self._answerer.pid(),self._port)
                self.isRegistered=True
                foamLogger().info("Registered with server. Response "
                                  + str(response))
            except socket.error:
                reason = sys.exc_info()[1] # compatible with 2.x and 3.x
                foamLogger().warning("Can't connect to meta-server - SocketError: "+str(reason))
            except:
                foamLogger().error("Can't connect to meta-server - Unknown Error: "+str(sys.exc_info()[0]))
                foamLogger().error(str(sys.exc_info()[1]))
                foamLogger().error("Traceback: "+str(extract_tb(sys.exc_info()[2])))
        except:
            # print "Error during registering (no socket module?)"
            pass
开发者ID:martinep,项目名称:foam-extend-svn,代码行数:26,代码来源:FoamServer.py


示例10: list_processes

def list_processes(roslaunch_uris=None):
    """
    @param roslaunch_uris: (optional) list of XML-RPCS. If none
        are provided, will look up URIs dynamically
    @type  roslaunch_uris: [str]
    @return: list of roslaunch processes
    @rtype: [L{NetProcess}]
    """
    if not roslaunch_uris:
        roslaunch_uris = get_roslaunch_uris()
        if not roslaunch_uris:
            return []

    procs = []
    for uri in roslaunch_uris:
        try:
            r = ServerProxy(uri)
            code, msg, val = r.list_processes()
            if code == 1:
                active, dead = val
                procs.extend([NetProcess(a[0], a[1], True, uri) for a in active])
                procs.extend([NetProcess(d[0], d[1], False, uri) for d in dead])
        except:
            #import traceback
            #traceback.print_exc()
            # don't have a mechanism for reporting these errors upwards yet
            pass 
    return procs
开发者ID:Aand1,项目名称:ROSCH,代码行数:28,代码来源:netapi.py


示例11: test_server

def test_server(host='localhost', port=4966):
    """Test for a Larch server on host and port

    Arguments
      host (str): host name ['localhost']
      port (int): port number [4966]

    Returns
      integer status number:
          0    Not in use.
          1    Connected, valid Larch server
          2    In use, but not a valid Larch server
    """
    server = ServerProxy('http://%s:%d' % (host, port))
    try:
        methods = server.system.listMethods()
    except socket.error:
        return NOT_IN_USE

    # verify that this is a valid larch server
    if len(methods) < 5 or 'larch' not in methods:
        return NOT_LARCHSERVER
    ret = ''
    try:
        ret = server.get_rawdata('_sys.config.user_larchdir')
    except:
        return NOT_LARCHSERVER
    if len(ret) < 1:
        return NOT_LARCHSERVER

    return CONNECTED
开发者ID:xraypy,项目名称:xraylarch,代码行数:31,代码来源:xmlrpc_server.py


示例12: get_next_server

    def get_next_server(self):
        """
        Get the next server online

        return rpc server object
        """

        #if index is out of list range, scan network again
        if self.index_list_online >= len(self.list_online):
            self.scan_online_servers()

        if len(self.list_online) == 0 :
            sys.exit("Couldn't find any server. Verify your connection.")


        #get the next server of the list of servers online
        self.ip_server = self.list_online[self.index_list_online]
        self.index_list_online += 1
        #make the structure to connect rpc_server
        server_addr = 'http://{}:{}'.format(self.ip_server, self.config.sync_port)

        try:
            #set configurations of connection
            connection = ServerProxy(server_addr)
            socket.setdefaulttimeout(10)
            #verify if server is online and fine
            connection.still_alive()
            socket.setdefaulttimeout(None)
            #return the object server_rpc
            return connection
        except:
            #try next connection
            return self.get_next_server()
开发者ID:gspd,项目名称:FlexA,代码行数:33,代码来源:RPC.py


示例13: test_get_last_N_minutes

 def test_get_last_N_minutes(self):
     proxy = ServerProxy('http://127.0.0.1:8000/')
     r = proxy.get_last_N_minutes('node-021', 'PH_EXT', 10)
     logging.debug(r)
     self.assertTrue('ReceptionTime' in r)
     self.assertTrue('PH_EXT' in r)
     self.assertTrue(len(r['ReceptionTime']) == len(r['PH_EXT']))
开发者ID:stanleylio,项目名称:cm1app,代码行数:7,代码来源:test_s1.py


示例14: Client

class Client(Cmd):
    intro = 'Welcome to the p2p file share shell.\n\
Type help or ? to list commands.'
    prompt = '>'

    def __init__(self, url, dirname, urlfile):
        Cmd.__init__(self)
        self.secret = random_string(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n.serve_forever)
        t.setDaemon(1)
        t.start()
        time.sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def do_fetch(self, arg):
        try:
            self.server.fetch(arg, self.secret)
        except Fault as f:
            if f.faultCode != UNHANDLED:
                raise
            print("Couldn't find the file ", arg)

    def do_exit(self, arg):
        sys.exit()

    do_EOF = do_exit
开发者ID:dust8,项目名称:bpfntp,代码行数:30,代码来源:client.py


示例15: Client

class Client(Cmd):
    prompt = '> '

    def __init__(self, url, dirname, urlfile):
        Cmd.__init__(self)
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n._start)
        t.setDaemon(1)
        t.start()

        sleep(HEAD_START)
        self.server = ServerProxy(url)
        urlfile = path.join(dirname, urlfile)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def do_fetch(self, arg):
        try:
            self.server.fetch(arg, self.secret)
        except Fault as f:
            if f.faultCode != UNHANDLED: raise
            print("Couldn't find the file", arg)

    def do_exit(self, arg):
        print()
        sys.exit()

    do_EOF = do_exit
开发者ID:panda0881,项目名称:Beginning-Python,代码行数:30,代码来源:client.py


示例16: on_task_output

    def on_task_output(self, task, config):
        from xmlrpc.client import ServerProxy

        params = dict(config)

        server = ServerProxy(params["url"])

        for entry in task.accepted:
            if task.options.test:
                log.info('Would add into nzbget: %s', entry['title'])
                continue

            # allow overriding the category
            if 'category' in entry:
                params['category'] = entry['category']

            try:
                server.appendurl(
                    entry['title'] + '.nzb',
                    params['category'],
                    params['priority'],
                    params['top'],
                    entry['url'],
                )
                log.info('Added `%s` to nzbget', entry['title'])
            except Exception as e:
                log.critical('rpc call to nzbget failed: %s', e)
                entry.fail('could not call appendurl via RPC')
开发者ID:Flexget,项目名称:Flexget,代码行数:28,代码来源:nzbget.py


示例17: PageStorageClient

class PageStorageClient(object):
    def __init__(self, server_address=("127.0.0.1", 9966)):
        self.server_address = server_address
        self.server_proxy = XMLRPCServerProxy(
            "http://%s:%s/" % self.server_address,
            allow_none=True)


    def get_page(self, target_url):
        return decode_from_binary(self.server_proxy.get_page(target_url))


    def get_page_by_id(self, page_id):
        return decode_from_binary(self.server_proxy.get_page_by_id(page_id))


    def set_page(self, page_info):
        return self.server_proxy.set_page(encode_to_binary(page_info))


    def remove_page(self, target):
        return self.server_proxy.remove_page(encode_to_binary(target))


    def __contains__(self, key):
        return self.server_proxy.has_key(key)


    def has_key(self, key):
        return self.__contains__(key)
开发者ID:sotoshigoto,项目名称:LXMLSearchEngine,代码行数:30,代码来源:PageStorageServer.py


示例18: test_unpublish

    def test_unpublish(self):
        node_proxy = ServerProxy(rospy.get_node_uri())
        
        _, _, pubs = node_proxy.getPublications('/foo')
        pubs = [p for p in pubs if p[0] != '/rosout']
        self.assert_(not pubs, pubs)
        
        print("Publishing ", PUBTOPIC)
        pub = rospy.Publisher(PUBTOPIC, String)
        topic = rospy.resolve_name(PUBTOPIC)
        _, _, pubs = node_proxy.getPublications('/foo')
        pubs = [p for p in pubs if p[0] != '/rosout']
        self.assertEquals([[topic, String._type]], pubs, "Pubs were %s"%pubs)

        # publish about 10 messages for fun
        for i in range(0, 10):
            pub.publish(String("hi [%s]"%i))
            time.sleep(0.1)
        
        # begin actual test by unsubscribing
        pub.unregister()
        
        # make sure no new messages are received in the next 2 seconds
        timeout_t = time.time() + 2.0
        while timeout_t < time.time():
            time.sleep(1.0)
        self.assert_(_last_callback is None)

        # verify that close cleaned up master and node state
        _, _, pubs = node_proxy.getPublications('/foo')
        pubs = [p for p in pubs if p[0] != '/rosout']
        self.assert_(not pubs, "Node still has pubs: %s"%pubs)
        n = rospy.get_caller_id()
        self.assert_(not rostest.is_publisher(topic, n), "publication is still active on master")
开发者ID:Aand1,项目名称:ROSCH,代码行数:34,代码来源:test_deregister.py


示例19: check_pypi

    def check_pypi(self):
        """
        If the requirement is frozen to pypi, check for a new version.
        """
        for dist in get_installed_distributions():
            name = dist.project_name
            if name in self.reqs.keys():
                self.reqs[name]["dist"] = dist

        pypi = ServerProxy("https://pypi.python.org/pypi")
        for name, req in list(self.reqs.items()):
            if req["url"]:
                continue  # skipping github packages.
            elif "dist" in req:
                dist = req["dist"]
                dist_version = LooseVersion(dist.version)
                available = pypi.package_releases(req["pip_req"].name, True) or pypi.package_releases(req["pip_req"].name.replace('-', '_'), True)
                available_version = self._available_version(dist_version, available)

                if not available_version:
                    msg = self.style.WARN("release is not on pypi (check capitalization and/or --extra-index-url)")
                elif self.options['show_newer'] and dist_version > available_version:
                    msg = self.style.INFO("{0} available (newer installed)".format(available_version))
                elif available_version > dist_version:
                    msg = self.style.INFO("{0} available".format(available_version))
                else:
                    msg = "up to date"
                    del self.reqs[name]
                    continue
                pkg_info = self.style.BOLD("{dist.project_name} {dist.version}".format(dist=dist))
            else:
                msg = "not installed"
                pkg_info = name
            print("{pkg_info:40} {msg}".format(pkg_info=pkg_info, msg=msg))
            del self.reqs[name]
开发者ID:marcosptf,项目名称:fedora,代码行数:35,代码来源:pipchecker.py


示例20: test_ROSLaunchNode

    def test_ROSLaunchNode(self):
        #exercise the basic ROSLaunchNode API
        from roslaunch.server import ROSLaunchNode

        # - create a node with a handler
        handler = TestHandler(self.pmon)
        node = ROSLaunchNode(handler)

        # - start the node
        node.start()
        self.assert_(node.uri)

        # - call the ping API that we added
        s = ServerProxy(node.uri)
        test_val = 'test-%s'%time.time()
        s.ping(test_val)
        self.assertEquals(handler.pinged, test_val)

        # - call the pid API
        code, msg, pid = s.get_pid()
        self.assertEquals(1, code)
        self.assert_(type(msg) == str)
        self.assertEquals(os.getpid(), pid)
        
        # - shut it down
        node.shutdown('test done')
开发者ID:Aand1,项目名称:ROSCH,代码行数:26,代码来源:test_roslaunch_server.py



注:本文中的xmlrpc.client.ServerProxy类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python server.SimpleXMLRPCServer类代码示例发布时间:2022-05-26
下一篇:
Python client.loads函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap