• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python xmlrpclib.ServerProxy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中xmlrpclib.ServerProxy的典型用法代码示例。如果您正苦于以下问题:Python ServerProxy类的具体用法?Python ServerProxy怎么用?Python ServerProxy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ServerProxy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: Client

class Client(wx.App):
    """
    主Client类,用于设定GUI,启动为文件服务的Node。
    """
    def __inif__(self, url, dirname, urlfile):
        """
        创建一个随机的密码,使用这个密码实例化Node。利用Node的_start方法(确保Thread是个无交互的后台程序,
        这样他会随着程序退出而退出)启动Thread,读取URL文件中的所有URL,并且将Node介绍给这些URL。
        """
        super(Client, self).__init__()
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n._start)
        t.setDaemon(1)
        t.start()

        #让服务器先启动。
        sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def OnInit(self):
        """
        设置GUI。创建窗体、文本框和按钮,并且进行布局。将提交按钮绑定到self.fetchHandler上。
        """
        win = wx.Frame(None, title="File Sharing Client", size=(400,45))

        bkg = wx.Panel(win)

        self.input = input = wx.TextCtrl(bkg)

        submit = wx.Button(bkg, label="Fetch", size=(80, 25))
        submit.Bind(wx.EVT_BUTTON, self.fetchHandler)

        hbox = wx.BoxSizer()
        hbox.Add(input, proportion=1, flag=wx.ALL|wx.EXPAND, border=10)
        hbox.Add(submit, flag=wx.TOP|wx.BOTTON|wx.RIGHT, border=10)

        vbox = wx.BoxSizer(wx.VERTICAL)
        vbox.Add(hbox, proportion=0, flag=wx.EXPAND)

        bkg.SetSizer(vbox)

        win.Show()

        return True

    def fetchHandler(self, event):
        """
        在用户点击‘Fetch’按钮时调用,读取文本框中的查询,调用服务器Node的fetch方法。
        如果查询没有被处理则打印错误信息。
        """
        query = self.input.getValue()
        try:
            self.server.fetch(query, self.secret)
        except Fault, f:
            if f.faultCode != UNHANDLED: raise
            print "Couldn't find the file", query
开发者ID:kosmos-zhang,项目名称:PythonLearning,代码行数:60,代码来源:simple_guiclient.py


示例2: execute

def execute(args, parser):
    global binstar_upload
    global client
    binstar_upload = args.binstar_upload

    client = ServerProxy(args.pypi_url)
    package = args.pypi_name[0]
    if args.release == 'latest':
        version = None
        all_versions = False
    else:
        all_versions = True
        version = args.release[0]

    releases = client.package_releases(package, all_versions)
    if not releases:
        sys.exit("Error:  PyPI does not have a package named %s" % package)

    if all_versions and version not in releases:
        print(releases)
        print("Warning:  PyPI does not have version %s of package %s" % (version, package))

    if all_versions:
        build_package(package, version)
    else:
        version = releases[0]
        build_package(package, version)
开发者ID:ezralanglois,项目名称:conda-build,代码行数:27,代码来源:main_pipbuild.py


示例3: Client

class Client(Cmd):
    '''
    A text-based client interface.
    '''
    prompt = '>'
    
    def __init__(self, url, dirname, urlfile):
        '''
        Initialize the node and start the Node server in a seperate thread.
        '''
        Cmd.__init__(self)
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n._start)
        t.setDaemon(1)
        t.start()
        
        sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            self.server.hello(line.strip())

    def do_fetch(self, arg):
        '''
        Fetch the specified file.
        '''
        try:
            self.server.fetch(arg, self.secret)
        except Fault, f:
            if f.faultCode !=UNHANDLED:
                raise
            print 'Could not find ', arg
开发者ID:chptcleo,项目名称:PythonPractice,代码行数:32,代码来源:client.py


示例4: RPC

class RPC(object):
    _instance = None

    def __new__(cls, host):
        if not cls._instance:
            cls._instance = object.__new__(cls, host)
        return cls._instance

    def __init__(self, host):
        self.host = host
        host = "localhost"
        self.server = ServerProxy("http://%s:%u" % (host, RPC_PORT))

    def _close(self):
        print "stop"
        self.server.stop()
        del self.server

    @classmethod
    def close(cls):
        print "close", cls._instance
        if not cls._instance:
            return
        cls._instance._close()
        cls._instance = None
开发者ID:regit,项目名称:nufw,代码行数:25,代码来源:rpc_client.py


示例5: test_listen_port

    def test_listen_port(self):
        """
        The test case to test the server
        created by NodeSamplerServer class.
        :return:
        """

        def start_server():
            self.node_sampler_server.start()

        server_thread = Thread(target=start_server)
        server_thread.setDaemon(True)
        server_thread.start()

        proxy = ServerProxy("http://localhost:8000")
        service_a = Service(name="$$service_a$$",
                            check_methods=TEST_METHOD,
                            ip=TEST_IP)
        service_b = Service(name="$$service_b$$",
                            check_methods=TEST_METHOD,
                            ip=TEST_IP)
        service_c = Service(name="$$service_c$$",
                            check_methods=TEST_METHOD,
                            ip=TEST_IP)
        fake_services = [service_a, service_b, service_c]
        self.assertEquals(
            proxy.sample(fake_services), 1,
            "NodeSamplerServer cannot create object.")
开发者ID:Claire-Sun,项目名称:Lab_python,代码行数:28,代码来源:test_sampler.py


示例6: TriggerServer

class TriggerServer():

    def __init__(self, ip, port, robot_name):
        self._server = Server((ip, port), allow_none=True)
        self._server.register_function(self.get, 'get')
        self._server.register_function(lambda: 'OK', 'ping')
        self._sp  = ServerProxy("http://%s:%d"%(ip,port))

        self._ros_publisher = rospy.Publisher('/%s/trigger'%robot_name, String, queue_size=10)

        self._stop = False

    # RPC METHOD
    def get(self, data):
        self._ros_publisher.publish(String(data=data))

    def serve(self):
        thread.start_new_thread(self._serve, ())
        rospy.spin()
        self._stop = True
        self._sp.ping()

    def _serve(self):
        while not self._stop:
            self._server.handle_request()
        rospy.loginfo("Shutting down TriggerServer")
开发者ID:tue-robotics,项目名称:multirobot_communication,代码行数:26,代码来源:trigger_server.py


示例7: execute

def execute(args, parser):
    global binstar_upload
    global client
    binstar_upload = args.binstar_upload

    client = ServerProxy(args.pypi_url)
    package = args.pypi_name[0]
    if args.release == 'latest':
        version = None
        all_versions = False
    else:
        all_versions = True
        version = args.release[0]

    search = client.search({'name': package})
    if search:
        r_name = list(filter(lambda x: ('name' in x and package.lower() == x['name'].lower()), search))
        if r_name:
            print('Package search: %s' % r_name[0])
            package = r_name[0]['name']

    releases = client.package_releases(package, all_versions)
    if not releases:
        sys.exit("Error:  PyPI does not have a package named %s" % package)

    if all_versions and version not in releases:
        print(releases)
        print("Warning:  PyPI does not have version %s of package %s" %
              (version, package))

    if all_versions:
        build_package(package, version)
    else:
        version = releases[0]
        build_package(package, version)
开发者ID:hargup,项目名称:conda-build,代码行数:35,代码来源:main_pipbuild.py


示例8: requestTopic

 def requestTopic(self, topic):
     code, statusMessage, publishers = self.master.registerSubscriber(
         self.callerId, topic, lookupTopicType(topic)[0], self.callerApi
     )
     assert code == 1, (code, statusMessage)
     assert len(publishers) == 1, (topic, publishers)  # i.e. fails if publisher is not ready now
     print publishers
     publisher = ServerProxy(publishers[0])
     code, statusMessage, protocolParams = publisher.requestTopic(self.callerId, topic, [["TCPROS"]])
     assert code == 1, (code, statusMessage)
     assert len(protocolParams) == 3, protocolParams
     print code, statusMessage, protocolParams
     hostPortPair = (protocolParams[1], protocolParams[2])
     soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # TCP
     soc.connect(hostPortPair)
     soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
     soc.setblocking(0)
     soc.settimeout(SOCKET_TIMEOUT)
     header = prefix4BytesLen(
         prefix4BytesLen("callerid=" + self.callerId)
         + prefix4BytesLen("topic=" + topic)
         + prefix4BytesLen("type=" + lookupTopicType(topic)[0])
         + prefix4BytesLen("md5sum=" + lookupTopicType(topic)[1])
         + ""
     )
     soc.send(header)
     return soc
开发者ID:robotika,项目名称:husky,代码行数:27,代码来源:node.py


示例9: Client

class Client(Cmd):
    prompt = '>'
    def __init__(self, url, dirname, urlfile):
        Cmd.__init__(self)
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n._start)
        t.setDaemon(1)
        t.start()
        sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def do_fetch(self, arg):
        try:
            self.server.fetch(arg, self.secret)
        except Fault as f:
            if f.faultCode != UNHANDLED:
                raise
            print "Couldn't find the file", arg, f

    def do_exit(self, arg):
        print
        sys.exit()

    do_EOF = do_exit
开发者ID:ruanima,项目名称:TenTinyPythonProject,代码行数:28,代码来源:client.py


示例10: Hella

class Hella(FooApp):
	name = 'hella'
	config_opts = {
		'password': 'The password defined as Hellanzb.XMLRPC_PASSWORD in hellanzb.conf',
		'server': 'The IP address or hostname running hellanzb',
		'port': 'The port hellanzb is running on. The default is 8760',
	}

	def __init__(self, server=None):
		FooApp.__init__(self, server)
		self.hellaserver = ServerProxy('http://hellanzb:%[email protected]%s:%s/' % (self.data['password'], self.data['server'], self.data['port']))
		self.data = FileStore('/tmp/apps/hella')
		try:
			self.cache = json.loads(self.data['cache'])
		except:
			self.cache = []

	def send(self, msg):
		response = self.hellaserver.enqueuenewzbin(int(msg['text']))
		return
	
	def run(self):
		while True:
			status = self.hellaserver.status()
			for entry in status['log_entries']:
				for key, value in entry.items():
					if not value in self.cache:
						self.recv('%s: %s' % (self.name, value))
						self.cache.append(value)
			self.data['cache'] = json.dumps(self.cache)
			sleep(10)
开发者ID:JeremyGrosser,项目名称:foobox,代码行数:31,代码来源:hella.py


示例11: Graph

class Graph( object ):

	ADDRESS = "http://127.0.0.1:8000/"
	"""The default pyWebGraph XML-RPC server address: `http://127.0.0.1:8000/`"""
	
	def __init__( self, address = None ):
		if not address: address = Graph.ADDRESS
		self.__proxy = ServerProxy( address )
		self.__wrapped = [ 'current_node', 'num_nodes', 'node_tos' ]

	def __getattr__( self, name ):
		if name in self.__wrapped:
			return getattr( self, name )
		else:
			return getattr( self.__proxy, name )
		
	def get_num_nodes( self ):
		return self.__proxy.get_num_nodes()
	
	num_nodes = property( get_num_nodes )

	def get_current_node( self ):
		return self.__proxy.get_current_node()
		
	def set_current_node( self, node ):
		return self.__proxy.set_current_node( node )

	current_node = property( get_current_node, set_current_node )

	def node_tos( self, node ):
		return self.__proxy.node_tos( node ).encode( 'utf8' )
开发者ID:aftab90,项目名称:py-web-graph,代码行数:31,代码来源:client.py


示例12: attk_server

class attk_server(object):
    def __init__(self, password, url):
        self.id = None
        self.password = password
        self.url = url
        self.xml = ServerProxy(self._make_url())

    def _make_url(self):
        url = list(urlsplit(self.url))
        url[1] = 'x:' + self.password + '@' + url[1]
        url[2] = url[2] + 'RPC2'
        return urlunsplit(url)

    def ping(self):
        debug('client pinging server')
        return self.xml.ping() == 'pong'

    def finish_attack(self, attack_id, status):
        debug('client finishing attack with server')
        self.xml.finishAttack(attack_id, config.client_id, status)

    def register(self):
        debug('client registering with server')
        retval = self.xml.registerClient(
            self.password,
            config.client_id,
            config.url,
            config.password,
        )
        if retval is not False:
            self.id = retval
开发者ID:adambregenzer,项目名称:hydra,代码行数:31,代码来源:attk_client.py


示例13: on_task_output

    def on_task_output(self, task, config):
        from xmlrpclib import ServerProxy

        params = dict(config)

        server = ServerProxy(params["url"])

        for entry in task.accepted:
            if task.options.test:
                log.info('Would add into nzbget: %s' % entry['title'])
                continue

            # allow overriding the category
            if 'category' in entry:
                params['category'] = entry['category']

            try:
                server.appendurl(entry["title"] + '.nzb',
                                 params["category"],
                                 params["priority"],
                                 params["top"],
                                 entry["url"])
                log.info("Added `%s` to nzbget" % entry["title"])
            except:
                log.critical("rpc call to nzbget failed")
                entry.fail("could not call appendurl via RPC")
开发者ID:Arhodes-CGRB-OSU,项目名称:Flexget,代码行数:26,代码来源:nzbget.py


示例14: Client

class Client(Cmd):
    """
    A simple text-based interface to the Node class.
    """

    prompt = '> '


    def __init__(self, url, dirname, urlfile):
        """
        Sets the url, dirname, and urlfile, and starts the Node
        Server in a separate thread.
        """
        Cmd.__init__(self)
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target=n._start)
        t.setDaemon(1)
        t.start()
        # Give the server a head start:
        sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def do_fetch(self, arg):
        "Call the fetch method of the Server."
        try:
            self.server.fetch(arg, self.secret)
        except Fault, f:
            if f.faultCode != UNHANDLED: raise
            print "Couldn't find the file", arg
开发者ID:kysnail,项目名称:linuxcode,代码行数:33,代码来源:client.py


示例15: editDevice

def editDevice(cluster, host):

  Zenoss = {'deviceName': '', 'devicePath': '', 'tag': '', 'serialNumber': '', 'zSnmpCommunity': 'cci-ro',
            'zSnmpPort': '161', 'zSnmpVer': 'v2c', 'rackSlot': '0', 'productionState': '1000', 'comments': '',
            'hwManufacturer': '', 'hwProductName': '', 'osManufacturer': '', 'osProductName': '',
            'locationPath': '', 'groupPaths': '', 'systemPaths': '', 'statusMonitors': '',
            'performanceMonitor': '', 'discoverProto': 'snmp', 'priority': '3'}

  Zenoss['deviceName'] = host
  Zenoss['systemPaths'] = cluster
  url = http(Zenoss['deviceName'])
  valid = re.search("http", url)
  if valid:
    snmp_out = http(url+snmpurl)
    Zenoss['zSnmpCommunity'] = snmp_out[0]
    Zenoss['zSnmpVer'] = snmp_out[1]
    sys_match = re.search("\w+", "cluster")
    if sys_match:
      Zenoss['systemPaths'] = "/%s" % (cluster)
   # groups_return = http(groupsUrl)
   # Zenoss['groupPaths'] = group_final(groups_return, Zenoss['systemPaths'])
    serv = ServerProxy (url,allow_none=1)
    serv.manage_editDevice(Zenoss['tag'], Zenoss['serialNumber'],
		  Zenoss['zSnmpCommunity'], Zenoss['zSnmpPort'], Zenoss['zSnmpVer'], Zenoss['rackSlot'],
		  Zenoss['productionState'], Zenoss['comments'], Zenoss['hwManufacturer'], Zenoss['hwProductName'],
		  Zenoss['osManufacturer'], Zenoss['osProductName'], Zenoss['locationPath'] , Zenoss['groupPaths'],
		  Zenoss['systemPaths'])
    if verbose:
      print "%s\n%s\n%s\n%s\n%s\n%s\n" % (url,Zenoss['zSnmpCommunity'],Zenoss['zSnmpVer'],Zenoss['comments'], Zenoss['systemPaths'],Zenoss['groupPaths'])
开发者ID:linuxdynasty,项目名称:Linuxdynasty,代码行数:29,代码来源:zenoss_netscaler_snmp.py


示例16: editDevice

def editDevice(info):

  Zenoss = {'deviceName': '', 'devicePath': '', 'tag': '', 'serialNumber': '', 'zSnmpCommunity': 'monitor', 
            'zSnmpPort': '161', 'zSnmpVer': 'v1', 'rackSlot': '0', 'productionState': '1000', 'comments': '',
            'hwManufacturer': '', 'hwProductName': '', 'osManufacturer': '', 'osProductName': '',
            'locationPath': '', 'groupPaths': '', 'systemPaths': '', 'statusMonitors': '',
            'performanceMonitor': '', 'discoverProto': 'snmp', 'priority': '3'}

  x = 0
  for a in  info:
    Zenoss['deviceName'] = info[x][0]
    url = http(Zenoss['deviceName'])
    valid = re.search("http", url)
    if valid:
      snmp_out = http(url+snmpurl)
      Zenoss['zSnmpCommunity'] = snmp_out[0]
      Zenoss['zSnmpVer'] = snmp_out[1]
      Zenoss['comments'] = "Switch %s\n Hardware %s\n Console %s\n Power %s\n Build Profile %s" % (info[x][2],info[x][3],info[x][4],info[x][5],info[x][6])
      sys_match = re.search("\w+", "info[x][8]")
      if sys_match:
        Zenoss['systemPaths'] = "/%s" % (info[x][8])
      groups_return = http(groupsUrl)
      Zenoss['groupPaths'] = group_final(groups_return, Zenoss['systemPaths'])
      serv = ServerProxy (url,allow_none=1)
      serv.manage_editDevice(Zenoss['tag'], Zenoss['serialNumber'],
		  Zenoss['zSnmpCommunity'], Zenoss['zSnmpPort'], Zenoss['zSnmpVer'], Zenoss['rackSlot'],
		  Zenoss['productionState'], Zenoss['comments'], Zenoss['hwManufacturer'], Zenoss['hwProductName'],
		  Zenoss['osManufacturer'], Zenoss['osProductName'], Zenoss['locationPath'] , Zenoss['groupPaths'], Zenoss['systemPaths'])
      if verbose:
        print "%s\n%s\n%s\n%s\n%s\n%s\n" % (url,Zenoss['zSnmpCommunity'],Zenoss['zSnmpVer'],Zenoss['comments'], Zenoss['systemPaths'],Zenoss['groupPaths'])
    x = x +1
开发者ID:Alex-Bautista,项目名称:Linuxdynasty,代码行数:31,代码来源:zenoss_mysql.py


示例17: main

def main():
    addr_params = XMLRPC_ADDR + (PROJECT_TOKEN, )
    bot = ServerProxy('http://%s:%i/%s/xmlrpc' % addr_params)
    repo = os.path.basename(os.getcwd())
    if repo.endswith('.git'):
        repo = repo[:-len('.git')]

    messages = []
    for line in sys.stdin:
        (old, new, refname) = line.split()
        if refname.startswith('refs/heads/'):
            refname = refname[len('refs/heads/'):]
            if new.strip('0'):
                if old.strip('0'):
                    revisions = get_output('git', 'rev-list',
                                            '%s..%s' % (old, new)).splitlines()
                    for revision in reversed(revisions):
                        msg = format_commit_message(repo, refname, revision)
                        messages.append(msg)
                else:
                    messages.append('New branch: %s/%s' % (repo, refname))
                    messages.append(
                        format_commit_message(repo, refname, new))
            else:
                messages.append(
                    'Branch %s/%s deleted (was: %s)' % (repo, refname, old))
    if messages:
        bot.notify("\n".join(messages))
开发者ID:RonnyPfannschmidt,项目名称:trompet,代码行数:28,代码来源:git-post-receive-hook.py


示例18: sync

 def sync(self, tracker):
     server = ServerProxy(tracker.config, allow_none=True,
                          use_datetime=datetime)
     last_update = DateTime(time.mktime(tracker.last_update.timetuple()))
     users = self._get_users(server)
     ids = map(int, server.filter('issue', None, {'activity':
                                                  str(last_update)}))
     for issue_id in ids:
         data = server.display('issue%d' % issue_id, 'title', 'creation',
                               'creator', 'assignedto', 'activity',
                               'messages', 'status')
         issue = Issue.by_tracker_id(tracker.id, issue_id)
         issue.no = issue_id
         issue.set_title(data.get('title', ''))
         issue.set_description(self._get_description(
             server, data.get('messages', [])))
         issue.reporter = users[int(data['creator'])]
         issue.owner = users[int(data['assignedto'])]
         issue.last_change = _roundup_date_to_datetime(data.get('activity'))
         status = int(data.get('status', -1))
         issue.active = status in ACTIVE_STATUS
         issue.tracker = tracker
         if not issue.id:
             issue.created = datetime.now()
         issue.updated = datetime.now()
         issue.save()
         post_issue_sync.send(sender=self, issue=issue)
         self._update_user_data(server, data, issue, users)
     tracker.last_update = datetime.now() - timedelta(days=1)
     tracker.save()
     post_tracker_sync.send(sender=self, tracker=tracker)
     return True
开发者ID:hkage,项目名称:django-issue-synchronisation,代码行数:32,代码来源:roundup.py


示例19: _broadcast

	def _broadcast(self,query,starturl,history):
		"""
		broadcast to all other nodes
		"""
		mylogger.info('-'*10)
		mylogger.info('[broadcast]:')
		mylogger.info("knows: {0}".format(self.known))
		mylogger.info("history: {0}".format(history))
		for other in self.known.copy():
			mylogger.info('[broadcast]: other is {0}'.format(other))
			if other in history:
				continue
			s = ServerProxy(other)
			mylogger.info('[broadcast]: Connecting from {0} to {1}'.format(self.url,other))
			mylogger.info('*'*80)
			try:
				code,data = s.query(query,starturl,history)
				mylogger.info('[broadcast]: query return code {0}'.format(code))
				if code == SUCCESS:
					mylogger.info('[broadcast]: query SUCCESS!!!')
					return code,data
				elif code == NOT_EXIST:
					mylogger.info('[broadcast]: query NOT_EXIST!!!')
				else:
					mylogger.info('[broadcast]: query ACCESS_DENIED!!!')
			except Fault, f: # connected to server,but method does not exist(Never happen in this example)
				mylogger.warn(f)
				mylogger.warn("[broadcast]:except fault")
			except socket.error, e:
				mylogger.warn("[broadcast]:except socket error")
				mylogger.error('[broadcast]: {0} for {1}'.format(e,other))
				# added by kzl
				self.known.remove(other)
开发者ID:freekid,项目名称:p2p,代码行数:33,代码来源:p2p_server.py


示例20: Client

class Client(Cmd):
    """
    Node类的简单的基于文本的界面。
    """
    prompt = '>'

    def __init__(self, url, dirname, urlfile):
        """
        设定url、dirname和urlfile,并且在单独的线程中启动Node服务器。
        """
        Cmd.__init__(self)
        self.secret = randomString(SECRET_LENGTH)
        n = Node(url, dirname, self.secret)
        t = Thread(target = n._start)
        t.setDaemon(1)
        t.start()

        #让服务器先启动。
        sleep(HEAD_START)
        self.server = ServerProxy(url)
        for line in open(urlfile):
            line = line.strip()
            self.server.hello(line)

    def do_fetch(self, arg):
        "调用服务器的fetch方法"
        try:
            self.server.fetch(arg, self.secret)
        except Fault, f:
            if f.faultCode != UNHANDLED: raise
            print "Counldn't find the file", arg
开发者ID:kosmos-zhang,项目名称:PythonLearning,代码行数:31,代码来源:client.py



注:本文中的xmlrpclib.ServerProxy类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python xmlrpclib.Transport类代码示例发布时间:2022-05-26
下一篇:
Python xmlrpclib.Fault类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap