• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python httpclient.HTTPClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.httpclient.HTTPClient的典型用法代码示例。如果您正苦于以下问题:Python HTTPClient类的具体用法?Python HTTPClient怎么用?Python HTTPClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HTTPClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: do_post_with_cert

 def do_post_with_cert(url, params={}, headers={}, client_key=None, client_cert=None):
     body = params if isinstance(params, str) else urllib.urlencode(params)
     http_request = HTTPRequest(url, 'POST', body=body, headers=headers, validate_cert=False, client_key=client_key,
                                client_cert=client_cert)
     http_client = HTTPClient()
     fetch_result = http_client.fetch(http_request)
     return fetch_result.body
开发者ID:onlyfu,项目名称:T3,代码行数:7,代码来源:httputils.py


示例2: get_current_user

    def get_current_user(self):
        # return self.get_secure_cookie("session_token")

        session_token = self.get_secure_cookie("session_token")
        logging.info("got session_token %r", session_token)
        expires_at = self.get_secure_cookie("expires_at")
        if expires_at is None or expires_at == "":
            expires_at = 0
        refresh_token = self.get_secure_cookie("refresh_token")

        _timestamp = int(time.time())
        if _timestamp > int(expires_at):
            return session_token
        else:
            url = "http://" + AUTH_HOST + "/auth/refresh-token"
            http_client = HTTPClient()
            response = http_client.fetch(url, method="GET", headers={"Authorization":"Bearer "+refresh_token})
            logging.info("got refresh-token response %r", response.body)

            token = json_decode(response.body)
            expires_at = _timestamp + token['expires_in']
            session_token = token['access_token']
            self.set_secure_cookie("session_token", session_token)
            self.set_secure_cookie("expires_at", str(expires_at))
            self.set_secure_cookie("refresh_token", token['refresh_token'])
            self.set_secure_cookie("account_id", token['account_id'])

            return session_token
开发者ID:ThomasZh,项目名称:backbone,代码行数:28,代码来源:comm.py


示例3: get_search

def get_search(content):
    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.start()
    blog_id = []
    try:
        html = None

        rc = '<h2><a href="/blog/(.+?)/" target="_blank">.*' + str(content) + ".*?</a></h2>"
        ra = re.compile(rc, re.IGNORECASE)
        # url = "http://{0}/catalog".format(config()["pyworm_blog"]["url"])
        url = "http://www.pyworm.com/catalog/"
        http_client = HTTPClient()
        response = None
        try:
            response = http_client.fetch(url, request_timeout=5)
        except Exception as e:
            error_log.error(e)
        if response and response.code == 200:
            html = response.body.decode('utf-8')
        try:
            blog_id = re.findall(ra, html)
        except Exception as e:
            error_log.error(e)
    except Exception as e:
        error_log.error(e)
    finally:
        io_loop.stop()
    return {"blog_id": blog_id}
开发者ID:mazhenpy,项目名称:pywormsite,代码行数:28,代码来源:search_blog.py


示例4: post

    def post(self):
        logging.info(self.request)
        phone = self.get_argument("registerPhone", "")
        md5pwd = self.get_argument("registerPwd", "")
        logging.info("phone %r", phone)

        try:
            url = "http://" + AUTH_HOST + "/auth/account"
            body_data = {"appid":APPID, "app_secret":APP_SECRET,
                    "login":phone, "pwd":md5pwd}
            logging.info("post body %r", body_data)
            _json = json_encode(body_data)
            http_client = HTTPClient()
            response = http_client.fetch(url, method="POST", body=_json)
            logging.info("got token response %r", response.body)

            _err_msg = _("You have already register an account, please login.")
            self.render('auth/login.html', err_msg=_err_msg)
        except:
            err_title = str( sys.exc_info()[0] );
            err_detail = str( sys.exc_info()[1] );
            logging.error("error: %r info: %r", err_title, err_detail)
            if err_detail == 'HTTP 409: Conflict':
                _err_msg = _("This phone already exist, please enter a new one.")
                self.render('auth/register.html', err_msg=_err_msg)
                return
            else:
                _err_msg = _(err_detail)
                self.render('auth/register.html', err_msg=_err_msg)
                return
开发者ID:ThomasZh,项目名称:backbone,代码行数:30,代码来源:auth_account.py


示例5: call

    def call(self, method, params, okay=None, fail=None):
        """Make an asynchronous JSON-RPC method call. """
        body = tornado.escape.json_encode({
            'jsonrpc': '2.0',
            'method': method,
            'params': params,
            'id': uuid.uuid4().hex,
        });

        logging.info("JSON-RPC: call '%s' method on %s" % (method, self.url))

        headers = HTTPHeaders({'Content-Type': 'application/json'})
        request = HTTPRequest(self.url, method='POST', body=body,
            headers=headers, request_timeout=0)

        if okay is None and fail is None:
            client = HTTPClient()
            response = client.fetch(request)

            if response.code != 200 or not response.body:
                return None

            try:
                data = tornado.escape.json_decode(response.body)
            except ValueError:
                return None
            else:
                return data
        else:
            client = AsyncHTTPClient()
            client.fetch(request, functools.partial(self._on_response, okay, fail))
开发者ID:Computing-works,项目名称:OnlineWork,代码行数:31,代码来源:jsonrpc.py


示例6: SyncHTTPClientTest

class SyncHTTPClientTest(unittest.TestCase):
    def setUp(self):
        self.server_ioloop = IOLoop()
        event = threading.Event()

        @gen.coroutine
        def init_server():
            sock, self.port = bind_unused_port()
            app = Application([("/", HelloWorldHandler)])
            self.server = HTTPServer(app)
            self.server.add_socket(sock)
            event.set()

        def start():
            self.server_ioloop.run_sync(init_server)
            self.server_ioloop.start()

        self.server_thread = threading.Thread(target=start)
        self.server_thread.start()
        event.wait()

        self.http_client = HTTPClient()

    def tearDown(self):
        def stop_server():
            self.server.stop()
            # Delay the shutdown of the IOLoop by several iterations because
            # the server may still have some cleanup work left when
            # the client finishes with the response (this is noticeable
            # with http/2, which leaves a Future with an unexamined
            # StreamClosedError on the loop).

            @gen.coroutine
            def slow_stop():
                # The number of iterations is difficult to predict. Typically,
                # one is sufficient, although sometimes it needs more.
                for i in range(5):
                    yield
                self.server_ioloop.stop()

            self.server_ioloop.add_callback(slow_stop)

        self.server_ioloop.add_callback(stop_server)
        self.server_thread.join()
        self.http_client.close()
        self.server_ioloop.close(all_fds=True)

    def get_url(self, path):
        return "http://127.0.0.1:%d%s" % (self.port, path)

    def test_sync_client(self):
        response = self.http_client.fetch(self.get_url("/"))
        self.assertEqual(b"Hello world!", response.body)

    def test_sync_client_error(self):
        # Synchronous HTTPClient raises errors directly; no need for
        # response.rethrow()
        with self.assertRaises(HTTPError) as assertion:
            self.http_client.fetch(self.get_url("/notfound"))
        self.assertEqual(assertion.exception.code, 404)
开发者ID:rgbkrk,项目名称:tornado,代码行数:60,代码来源:httpclient_test.py


示例7: submit_batch

def submit_batch(base_url, batch):
    print("Submitting a batch")
    http = HTTPClient()
    url = base_url + '_bulk'
    body = '\n'.join(json.dumps(doc) for doc in batch)
    resp = http.fetch(url, method = 'POST', body = body)
    resp.rethrow()
开发者ID:adicu,项目名称:data.adicu.com,代码行数:7,代码来源:pg_es_import.py


示例8: get

    def get(self):
        code = self.get_argument("code", False)
        if not code:
            raise web.HTTPError(400, "oauth callback made without a token") 
            

        http_client = HTTPClient()

        authorization = "Basic %s" % str(
                base64.b64encode(bytes(APP_Key + ":" + APP_Secret, encoding='utf-8')), 
                encoding='utf-8')

        req = HTTPRequest("https://open.hs.net/oauth2/oauth2/token",
                method="POST",
                headers={
                    "Authorization":authorization,
                    "Content-Type":"application/x-www-form-urlencoded"
                },
                body=urllib.parse.urlencode({
                    "grant_type":"authorization_code",
                    "code":code
                }) 
            )

        resp = http_client.fetch(req)
        resp_json = json.loads(resp.body.decode('utf8', 'replace'))
        
        print('===============================')
        print(resp_json)
开发者ID:jzm17173,项目名称:Learn,代码行数:29,代码来源:fly.py


示例9: get_schoolnum_name

 def get_schoolnum_name(self, number):
     try:
         CURR_URL = 'http://xk.urp.seu.edu.cn/jw_service/service/stuCurriculum.action'
         term = "16-17-1"
         params = urllib.urlencode({
             'queryStudentId': number,
             'queryAcademicYear': term})
         client = HTTPClient()
         request = HTTPRequest(
                 CURR_URL,
                 method='POST',
                 body=params,
                 request_timeout=TIME_OUT)
         response = client.fetch(request)
         body = response.body
         if not body:
             return "-1"
         else:
             soup = BeautifulSoup(body)
             number = soup.findAll('td', align='left')[2].text[3:]
             name = soup.findAll('td', align='left')[4].text[3:]
             cardnum = soup.findAll('td', align='left')[3].text[5:]
             return name, number, cardnum
     except Exception,e:
         return "-1"
开发者ID:HeraldStudio,项目名称:webservice-py,代码行数:25,代码来源:handler.py


示例10: put

    def put(self, article_id):
        logging.info(self.request)
        logging.info("got article_id %r from uri", article_id)

        access_token = None
        try:
            access_token = self.request.headers['Authorization']
            access_token = access_token.replace('Bearer ','')
        except:
            logging.info("got access_token null")
            self.set_status(400) # Bad Request
            self.write('Bad Request')
            self.finish()
            return
        logging.info("got access_token %r", access_token)

        url = "http://"+AUTH_HOST+"/blog/articles/"+article_id+"/pub"
        http_client = HTTPClient()
        body_data = {'article_id':article_id}
        logging.info("post body %r", body_data)
        _json = json_encode(body_data)
        response = http_client.fetch(url, method="PUT", body=_json, headers={"Authorization":"Bearer "+access_token})
        logging.info("got response %r", response.body)

        self.finish()
开发者ID:ThomasZh,项目名称:backbone,代码行数:25,代码来源:ajax_article.py


示例11: get

    def get(self, account_id):
        logging.info(self.request)
        logging.info("got account_id %r from uri", account_id)

        _timestamp = self.get_argument("last", 0) # datetime as timestamp
        logging.info("got last %r", _timestamp)
        _timestamp = int(_timestamp)
        if _timestamp == 0:
            _timestamp = int(time.time())
        logging.info("got _timestamp %r", _timestamp)

        try:
            params = {"before":_timestamp, "limit":2, "status":"all"}
            url = url_concat("http://"+AUTH_HOST+"/blog/accounts/"+account_id+"/articles", params)
            http_client = HTTPClient()
            response = http_client.fetch(url, method="GET")
            logging.info("got response %r", response.body)
            _articles = json_decode(response.body)

            for _article in _articles:
                # publish_time 转换成友好阅读方式(例如:10分钟前),保留此值为分页使用
                _article["timestamp"] = _article["publish_time"]
                _article["publish_time"] = time_span(_article["publish_time"])

            self.finish(JSON.dumps(_articles))
        except:
            err_title = str( sys.exc_info()[0] );
            err_detail = str( sys.exc_info()[1] );
            logging.error("error: %r info: %r", err_title, err_detail)
            if err_detail == 'HTTP 404: Not Found':
                self.finish()
开发者ID:ThomasZh,项目名称:backbone,代码行数:31,代码来源:ajax_article.py


示例12: get_steam_user

def get_steam_user(db, steamid):
    user = None
    key = yield Op(db['server'].find_one, {'key': 'apikey'})
    url = url_concat('http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/',
                     {'key': key['value'], 'steamids': steamid})
    client = HTTPClient()
    try:
        response = client.fetch(url)
        get_user = json_decode(response.body)['response']['players'][0]
        user = {'steamid': get_user['steamid'],
                'steamid32': converter(steamid),
                'personaname': get_user['personaname'],
                'profileurl': get_user['profileurl'],
                'avatar': get_user['avatarfull'],
                'registration': datetime.now(),
                'bookmarks': [],
                'favorites': [],
                'update': datetime.now() + timedelta(minutes=1),
                'dota_count': 0}
        if 'realname' in get_user.keys():
            user['realname'] = get_user['realname']
        else:
            user['realname'] = None
    except HTTPError as e:
        logging.error('Error: %s' % e)
    client.close()
    return user
开发者ID:Lardjo,项目名称:stml1,代码行数:27,代码来源:getuser.py


示例13: get_motion_detection

def get_motion_detection(camera_id):
    thread_id = camera_id_to_thread_id(camera_id)
    if thread_id is None:
        return logging.error('could not find thread id for camera with id %s' % camera_id)
    
    url = 'http://127.0.0.1:7999/%(id)s/detection/status' % {'id': thread_id}
    
    request = HTTPRequest(url, connect_timeout=5, request_timeout=5)
    http_client = HTTPClient()
    try:
        response = http_client.fetch(request)
        if response.error:
            raise response.error 
    
    except Exception as e:
        logging.error('failed to get motion detection status for camera with id %(id)s: %(msg)s' % {
                'id': camera_id,
                'msg': unicode(e)})

        return None
    
    enabled = bool(response.body.lower().count('active'))
    
    logging.debug('motion detection is %(what)s for camera with id %(id)s' % {
            'what': ['disabled', 'enabled'][enabled],
            'id': camera_id})
    
    return enabled
开发者ID:schmeitz,项目名称:motioneye,代码行数:28,代码来源:motionctl.py


示例14: get_api_return

def get_api_return(api_name, user, data={}, timeout=TIME_OUT):
    ret = {}
    client = HTTPClient()
    data['uuid'] = user.uuid
    params = urllib.urlencode(data)
    request = HTTPRequest(SERVICE + api_name, method='POST',
                          body=params, request_timeout=timeout)
    try:
        response = client.fetch(request)
        ret = json.loads(response.body)
        if 200<=ret['code']< 300:
            return ret
        elif ret['code'] == 401:
            ret['content'] = error_map[401] % (LOCAL, user.openid)
        else:
            ret['content'] = error_map[ret['code']]
    except HTTPError as e:
        ret['code'] = e.code
        if ret['code'] == 401:
            ret['content'] = error_map[401] % (LOCAL, user.openid)
        else:
            ret['content'] = error_map[ret['code']]
    except Exception,e:
        with open('api_error.log','a+') as f:
            f.write(strftime('%Y%m%d %H:%M:%S in [get_api_return]', localtime(time()))+'\n'+str(e)+'\n['+api_name+']\t'+str(user.cardnum)+'\nString:'+str(ret)+'\n\n')
        ret['code'] = 500
        ret['content'] = u'=。= 服务器未能及时回应请求,不如再试试'
开发者ID:HeraldStudio,项目名称:wechat,代码行数:27,代码来源:get_api_return.py


示例15: getAccessToken

def getAccessToken(appId, appSecret, code):
    url = "https://api.weixin.qq.com/sns/oauth2/access_token?appid="+appId+"&secret="+appSecret+"&code="+code+"&grant_type=authorization_code"
    http_client = HTTPClient()
    response = http_client.fetch(url, method="GET")
    logging.info("got response %r", response.body)
    accessToken = json_decode(response.body)
    return accessToken
开发者ID:Kevin-ZYF,项目名称:VoteWithPython,代码行数:7,代码来源:wechat.py


示例16: post

    def post(self, article_id):
        logging.info(self.request)
        logging.info("got article_id %r from uri", article_id)
        paragraphs = self.get_argument("paragraphs", "")
        logging.info("got paragraphs %r", paragraphs)

        # 使用 html2text 将网页内容转换为 Markdown 格式
        h = html2text.HTML2Text()
        h.ignore_links = False
        paragraphs = h.handle(paragraphs)
        logging.info("got paragraphs %r", paragraphs)

        random = random_x(8)
        logging.info("got random %r", random)

        session_token = self.get_secure_cookie("session_token")
        logging.info("got session_token %r from cookie", session_token)

        # 修改文章段落内容
        url = "http://" + AUTH_HOST + "/blog/articles/" + article_id + "/paragraphs"
        body_data = {'paragraphs':paragraphs}
        logging.info("put body %r", body_data)
        _json = json_encode(body_data)
        http_client = HTTPClient()
        response = http_client.fetch(url, method="PUT", body=_json, headers={"Authorization":"Bearer "+session_token})
        logging.info("got token response %r", response.body)

        self.redirect('/blog/articles/mine?random=' + random)
开发者ID:ThomasZh,项目名称:backbone,代码行数:28,代码来源:blog2_article.py


示例17: getUserInfo

def getUserInfo(token, openid):
    url = "https://api.weixin.qq.com/sns/userinfo?access_token="+token+"&openid="+openid+"&lang=zh_CN"
    http_client = HTTPClient()
    response = http_client.fetch(url, method="GET")
    logging.info("got response %r", response.body)
    userInfo = json_decode(response.body)
    return userInfo
开发者ID:Kevin-ZYF,项目名称:VoteWithPython,代码行数:7,代码来源:wechat.py


示例18: run

    def run(self):
        while True:
            try:
                maxid = self.db.news_list()[0]['id']
            except:
                maxid = 1
            print(maxid)
            client = HTTPClient()
            response = client.fetch('http://cs.hust.edu.cn/rss')
            result = response.body.decode("utf-8",errors='ignore')
            soup = BeautifulStoneSoup(result)
            
            items = soup.find_all('item')
            for item in items:
                title = item.title.text
                link = item.link.text
                desc = item.description.text
                linkid = self.link_id(link)
                if linkid > maxid:
                    result = self.db.add_news(linkid,title,desc,link)
                    if result:
                        result = self.get_article(link)
                else:
                    break

            time.sleep(3600)
开发者ID:Monk-Liu,项目名称:CS,代码行数:26,代码来源:productor.py


示例19: gen_msg_token

def gen_msg_token(phone):
	s = DBSession()
	code = "".join(random.sample("123456789",4))
	flag = False

	url = "http://106.ihuyi.cn/webservice/sms.php?method=Submit&account={account}&password={password}&mobile={phone}&content={content}".format(account=account,password=password,phone=phone,content=url_escape(content.format(code=code)))
	h = HTTPClient()
	try:
		res = h.fetch(url,connect_timeout = 5.0)
	except:
		flag,msg =  sendTemplateSMS(phone,{code},32417)
		if flag:
			update_code(phone,code)
			return True
		else:
			return msg
	h.close()
	root = ElementTree.fromstring(res.body.decode())
	if not root[0].text == '2':
		# print("[VerifyMsg]Send error:",root[0].text,root[1].text)
		#如果发送失败,则改用云通讯发送
		flag,msg =  sendTemplateSMS(phone,{code},32417)
		if flag:
			update_code(phone,code)
			return True
		else:
			return msg
	else:
		update_code(phone,code)
		return True
开发者ID:krizex,项目名称:senguoSmh,代码行数:30,代码来源:msgverify.py


示例20: post_api

def post_api(path):
    "POST command to remote API"
    http = HTTPClient()
    resp = http.fetch(get_url(path),
                      method="POST",
                      body='')
    return json_decode(resp.body)
开发者ID:dmitriko,项目名称:sman,代码行数:7,代码来源:smanctl.py



注:本文中的tornado.httpclient.HTTPClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python httpclient.HTTPError类代码示例发布时间:2022-05-27
下一篇:
Python httpclient.AsyncHTTPClient类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap