本文整理汇总了Python中tornado.httpclient.HTTPClient类的典型用法代码示例。如果您正苦于以下问题:Python HTTPClient类的具体用法?Python HTTPClient怎么用?Python HTTPClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HTTPClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: do_post_with_cert
def do_post_with_cert(url, params={}, headers={}, client_key=None, client_cert=None):
body = params if isinstance(params, str) else urllib.urlencode(params)
http_request = HTTPRequest(url, 'POST', body=body, headers=headers, validate_cert=False, client_key=client_key,
client_cert=client_cert)
http_client = HTTPClient()
fetch_result = http_client.fetch(http_request)
return fetch_result.body
开发者ID:onlyfu,项目名称:T3,代码行数:7,代码来源:httputils.py
示例2: get_current_user
def get_current_user(self):
# return self.get_secure_cookie("session_token")
session_token = self.get_secure_cookie("session_token")
logging.info("got session_token %r", session_token)
expires_at = self.get_secure_cookie("expires_at")
if expires_at is None or expires_at == "":
expires_at = 0
refresh_token = self.get_secure_cookie("refresh_token")
_timestamp = int(time.time())
if _timestamp > int(expires_at):
return session_token
else:
url = "http://" + AUTH_HOST + "/auth/refresh-token"
http_client = HTTPClient()
response = http_client.fetch(url, method="GET", headers={"Authorization":"Bearer "+refresh_token})
logging.info("got refresh-token response %r", response.body)
token = json_decode(response.body)
expires_at = _timestamp + token['expires_in']
session_token = token['access_token']
self.set_secure_cookie("session_token", session_token)
self.set_secure_cookie("expires_at", str(expires_at))
self.set_secure_cookie("refresh_token", token['refresh_token'])
self.set_secure_cookie("account_id", token['account_id'])
return session_token
开发者ID:ThomasZh,项目名称:backbone,代码行数:28,代码来源:comm.py
示例3: get_search
def get_search(content):
io_loop = tornado.ioloop.IOLoop.current()
io_loop.start()
blog_id = []
try:
html = None
rc = '<h2><a href="/blog/(.+?)/" target="_blank">.*' + str(content) + ".*?</a></h2>"
ra = re.compile(rc, re.IGNORECASE)
# url = "http://{0}/catalog".format(config()["pyworm_blog"]["url"])
url = "http://www.pyworm.com/catalog/"
http_client = HTTPClient()
response = None
try:
response = http_client.fetch(url, request_timeout=5)
except Exception as e:
error_log.error(e)
if response and response.code == 200:
html = response.body.decode('utf-8')
try:
blog_id = re.findall(ra, html)
except Exception as e:
error_log.error(e)
except Exception as e:
error_log.error(e)
finally:
io_loop.stop()
return {"blog_id": blog_id}
开发者ID:mazhenpy,项目名称:pywormsite,代码行数:28,代码来源:search_blog.py
示例4: post
def post(self):
logging.info(self.request)
phone = self.get_argument("registerPhone", "")
md5pwd = self.get_argument("registerPwd", "")
logging.info("phone %r", phone)
try:
url = "http://" + AUTH_HOST + "/auth/account"
body_data = {"appid":APPID, "app_secret":APP_SECRET,
"login":phone, "pwd":md5pwd}
logging.info("post body %r", body_data)
_json = json_encode(body_data)
http_client = HTTPClient()
response = http_client.fetch(url, method="POST", body=_json)
logging.info("got token response %r", response.body)
_err_msg = _("You have already register an account, please login.")
self.render('auth/login.html', err_msg=_err_msg)
except:
err_title = str( sys.exc_info()[0] );
err_detail = str( sys.exc_info()[1] );
logging.error("error: %r info: %r", err_title, err_detail)
if err_detail == 'HTTP 409: Conflict':
_err_msg = _("This phone already exist, please enter a new one.")
self.render('auth/register.html', err_msg=_err_msg)
return
else:
_err_msg = _(err_detail)
self.render('auth/register.html', err_msg=_err_msg)
return
开发者ID:ThomasZh,项目名称:backbone,代码行数:30,代码来源:auth_account.py
示例5: call
def call(self, method, params, okay=None, fail=None):
"""Make an asynchronous JSON-RPC method call. """
body = tornado.escape.json_encode({
'jsonrpc': '2.0',
'method': method,
'params': params,
'id': uuid.uuid4().hex,
});
logging.info("JSON-RPC: call '%s' method on %s" % (method, self.url))
headers = HTTPHeaders({'Content-Type': 'application/json'})
request = HTTPRequest(self.url, method='POST', body=body,
headers=headers, request_timeout=0)
if okay is None and fail is None:
client = HTTPClient()
response = client.fetch(request)
if response.code != 200 or not response.body:
return None
try:
data = tornado.escape.json_decode(response.body)
except ValueError:
return None
else:
return data
else:
client = AsyncHTTPClient()
client.fetch(request, functools.partial(self._on_response, okay, fail))
开发者ID:Computing-works,项目名称:OnlineWork,代码行数:31,代码来源:jsonrpc.py
示例6: SyncHTTPClientTest
class SyncHTTPClientTest(unittest.TestCase):
def setUp(self):
self.server_ioloop = IOLoop()
event = threading.Event()
@gen.coroutine
def init_server():
sock, self.port = bind_unused_port()
app = Application([("/", HelloWorldHandler)])
self.server = HTTPServer(app)
self.server.add_socket(sock)
event.set()
def start():
self.server_ioloop.run_sync(init_server)
self.server_ioloop.start()
self.server_thread = threading.Thread(target=start)
self.server_thread.start()
event.wait()
self.http_client = HTTPClient()
def tearDown(self):
def stop_server():
self.server.stop()
# Delay the shutdown of the IOLoop by several iterations because
# the server may still have some cleanup work left when
# the client finishes with the response (this is noticeable
# with http/2, which leaves a Future with an unexamined
# StreamClosedError on the loop).
@gen.coroutine
def slow_stop():
# The number of iterations is difficult to predict. Typically,
# one is sufficient, although sometimes it needs more.
for i in range(5):
yield
self.server_ioloop.stop()
self.server_ioloop.add_callback(slow_stop)
self.server_ioloop.add_callback(stop_server)
self.server_thread.join()
self.http_client.close()
self.server_ioloop.close(all_fds=True)
def get_url(self, path):
return "http://127.0.0.1:%d%s" % (self.port, path)
def test_sync_client(self):
response = self.http_client.fetch(self.get_url("/"))
self.assertEqual(b"Hello world!", response.body)
def test_sync_client_error(self):
# Synchronous HTTPClient raises errors directly; no need for
# response.rethrow()
with self.assertRaises(HTTPError) as assertion:
self.http_client.fetch(self.get_url("/notfound"))
self.assertEqual(assertion.exception.code, 404)
开发者ID:rgbkrk,项目名称:tornado,代码行数:60,代码来源:httpclient_test.py
示例7: submit_batch
def submit_batch(base_url, batch):
print("Submitting a batch")
http = HTTPClient()
url = base_url + '_bulk'
body = '\n'.join(json.dumps(doc) for doc in batch)
resp = http.fetch(url, method = 'POST', body = body)
resp.rethrow()
开发者ID:adicu,项目名称:data.adicu.com,代码行数:7,代码来源:pg_es_import.py
示例8: get
def get(self):
code = self.get_argument("code", False)
if not code:
raise web.HTTPError(400, "oauth callback made without a token")
http_client = HTTPClient()
authorization = "Basic %s" % str(
base64.b64encode(bytes(APP_Key + ":" + APP_Secret, encoding='utf-8')),
encoding='utf-8')
req = HTTPRequest("https://open.hs.net/oauth2/oauth2/token",
method="POST",
headers={
"Authorization":authorization,
"Content-Type":"application/x-www-form-urlencoded"
},
body=urllib.parse.urlencode({
"grant_type":"authorization_code",
"code":code
})
)
resp = http_client.fetch(req)
resp_json = json.loads(resp.body.decode('utf8', 'replace'))
print('===============================')
print(resp_json)
开发者ID:jzm17173,项目名称:Learn,代码行数:29,代码来源:fly.py
示例9: get_schoolnum_name
def get_schoolnum_name(self, number):
try:
CURR_URL = 'http://xk.urp.seu.edu.cn/jw_service/service/stuCurriculum.action'
term = "16-17-1"
params = urllib.urlencode({
'queryStudentId': number,
'queryAcademicYear': term})
client = HTTPClient()
request = HTTPRequest(
CURR_URL,
method='POST',
body=params,
request_timeout=TIME_OUT)
response = client.fetch(request)
body = response.body
if not body:
return "-1"
else:
soup = BeautifulSoup(body)
number = soup.findAll('td', align='left')[2].text[3:]
name = soup.findAll('td', align='left')[4].text[3:]
cardnum = soup.findAll('td', align='left')[3].text[5:]
return name, number, cardnum
except Exception,e:
return "-1"
开发者ID:HeraldStudio,项目名称:webservice-py,代码行数:25,代码来源:handler.py
示例10: put
def put(self, article_id):
logging.info(self.request)
logging.info("got article_id %r from uri", article_id)
access_token = None
try:
access_token = self.request.headers['Authorization']
access_token = access_token.replace('Bearer ','')
except:
logging.info("got access_token null")
self.set_status(400) # Bad Request
self.write('Bad Request')
self.finish()
return
logging.info("got access_token %r", access_token)
url = "http://"+AUTH_HOST+"/blog/articles/"+article_id+"/pub"
http_client = HTTPClient()
body_data = {'article_id':article_id}
logging.info("post body %r", body_data)
_json = json_encode(body_data)
response = http_client.fetch(url, method="PUT", body=_json, headers={"Authorization":"Bearer "+access_token})
logging.info("got response %r", response.body)
self.finish()
开发者ID:ThomasZh,项目名称:backbone,代码行数:25,代码来源:ajax_article.py
示例11: get
def get(self, account_id):
logging.info(self.request)
logging.info("got account_id %r from uri", account_id)
_timestamp = self.get_argument("last", 0) # datetime as timestamp
logging.info("got last %r", _timestamp)
_timestamp = int(_timestamp)
if _timestamp == 0:
_timestamp = int(time.time())
logging.info("got _timestamp %r", _timestamp)
try:
params = {"before":_timestamp, "limit":2, "status":"all"}
url = url_concat("http://"+AUTH_HOST+"/blog/accounts/"+account_id+"/articles", params)
http_client = HTTPClient()
response = http_client.fetch(url, method="GET")
logging.info("got response %r", response.body)
_articles = json_decode(response.body)
for _article in _articles:
# publish_time 转换成友好阅读方式(例如:10分钟前),保留此值为分页使用
_article["timestamp"] = _article["publish_time"]
_article["publish_time"] = time_span(_article["publish_time"])
self.finish(JSON.dumps(_articles))
except:
err_title = str( sys.exc_info()[0] );
err_detail = str( sys.exc_info()[1] );
logging.error("error: %r info: %r", err_title, err_detail)
if err_detail == 'HTTP 404: Not Found':
self.finish()
开发者ID:ThomasZh,项目名称:backbone,代码行数:31,代码来源:ajax_article.py
示例12: get_steam_user
def get_steam_user(db, steamid):
user = None
key = yield Op(db['server'].find_one, {'key': 'apikey'})
url = url_concat('http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/',
{'key': key['value'], 'steamids': steamid})
client = HTTPClient()
try:
response = client.fetch(url)
get_user = json_decode(response.body)['response']['players'][0]
user = {'steamid': get_user['steamid'],
'steamid32': converter(steamid),
'personaname': get_user['personaname'],
'profileurl': get_user['profileurl'],
'avatar': get_user['avatarfull'],
'registration': datetime.now(),
'bookmarks': [],
'favorites': [],
'update': datetime.now() + timedelta(minutes=1),
'dota_count': 0}
if 'realname' in get_user.keys():
user['realname'] = get_user['realname']
else:
user['realname'] = None
except HTTPError as e:
logging.error('Error: %s' % e)
client.close()
return user
开发者ID:Lardjo,项目名称:stml1,代码行数:27,代码来源:getuser.py
示例13: get_motion_detection
def get_motion_detection(camera_id):
thread_id = camera_id_to_thread_id(camera_id)
if thread_id is None:
return logging.error('could not find thread id for camera with id %s' % camera_id)
url = 'http://127.0.0.1:7999/%(id)s/detection/status' % {'id': thread_id}
request = HTTPRequest(url, connect_timeout=5, request_timeout=5)
http_client = HTTPClient()
try:
response = http_client.fetch(request)
if response.error:
raise response.error
except Exception as e:
logging.error('failed to get motion detection status for camera with id %(id)s: %(msg)s' % {
'id': camera_id,
'msg': unicode(e)})
return None
enabled = bool(response.body.lower().count('active'))
logging.debug('motion detection is %(what)s for camera with id %(id)s' % {
'what': ['disabled', 'enabled'][enabled],
'id': camera_id})
return enabled
开发者ID:schmeitz,项目名称:motioneye,代码行数:28,代码来源:motionctl.py
示例14: get_api_return
def get_api_return(api_name, user, data={}, timeout=TIME_OUT):
ret = {}
client = HTTPClient()
data['uuid'] = user.uuid
params = urllib.urlencode(data)
request = HTTPRequest(SERVICE + api_name, method='POST',
body=params, request_timeout=timeout)
try:
response = client.fetch(request)
ret = json.loads(response.body)
if 200<=ret['code']< 300:
return ret
elif ret['code'] == 401:
ret['content'] = error_map[401] % (LOCAL, user.openid)
else:
ret['content'] = error_map[ret['code']]
except HTTPError as e:
ret['code'] = e.code
if ret['code'] == 401:
ret['content'] = error_map[401] % (LOCAL, user.openid)
else:
ret['content'] = error_map[ret['code']]
except Exception,e:
with open('api_error.log','a+') as f:
f.write(strftime('%Y%m%d %H:%M:%S in [get_api_return]', localtime(time()))+'\n'+str(e)+'\n['+api_name+']\t'+str(user.cardnum)+'\nString:'+str(ret)+'\n\n')
ret['code'] = 500
ret['content'] = u'=。= 服务器未能及时回应请求,不如再试试'
开发者ID:HeraldStudio,项目名称:wechat,代码行数:27,代码来源:get_api_return.py
示例15: getAccessToken
def getAccessToken(appId, appSecret, code):
url = "https://api.weixin.qq.com/sns/oauth2/access_token?appid="+appId+"&secret="+appSecret+"&code="+code+"&grant_type=authorization_code"
http_client = HTTPClient()
response = http_client.fetch(url, method="GET")
logging.info("got response %r", response.body)
accessToken = json_decode(response.body)
return accessToken
开发者ID:Kevin-ZYF,项目名称:VoteWithPython,代码行数:7,代码来源:wechat.py
示例16: post
def post(self, article_id):
logging.info(self.request)
logging.info("got article_id %r from uri", article_id)
paragraphs = self.get_argument("paragraphs", "")
logging.info("got paragraphs %r", paragraphs)
# 使用 html2text 将网页内容转换为 Markdown 格式
h = html2text.HTML2Text()
h.ignore_links = False
paragraphs = h.handle(paragraphs)
logging.info("got paragraphs %r", paragraphs)
random = random_x(8)
logging.info("got random %r", random)
session_token = self.get_secure_cookie("session_token")
logging.info("got session_token %r from cookie", session_token)
# 修改文章段落内容
url = "http://" + AUTH_HOST + "/blog/articles/" + article_id + "/paragraphs"
body_data = {'paragraphs':paragraphs}
logging.info("put body %r", body_data)
_json = json_encode(body_data)
http_client = HTTPClient()
response = http_client.fetch(url, method="PUT", body=_json, headers={"Authorization":"Bearer "+session_token})
logging.info("got token response %r", response.body)
self.redirect('/blog/articles/mine?random=' + random)
开发者ID:ThomasZh,项目名称:backbone,代码行数:28,代码来源:blog2_article.py
示例17: getUserInfo
def getUserInfo(token, openid):
url = "https://api.weixin.qq.com/sns/userinfo?access_token="+token+"&openid="+openid+"&lang=zh_CN"
http_client = HTTPClient()
response = http_client.fetch(url, method="GET")
logging.info("got response %r", response.body)
userInfo = json_decode(response.body)
return userInfo
开发者ID:Kevin-ZYF,项目名称:VoteWithPython,代码行数:7,代码来源:wechat.py
示例18: run
def run(self):
while True:
try:
maxid = self.db.news_list()[0]['id']
except:
maxid = 1
print(maxid)
client = HTTPClient()
response = client.fetch('http://cs.hust.edu.cn/rss')
result = response.body.decode("utf-8",errors='ignore')
soup = BeautifulStoneSoup(result)
items = soup.find_all('item')
for item in items:
title = item.title.text
link = item.link.text
desc = item.description.text
linkid = self.link_id(link)
if linkid > maxid:
result = self.db.add_news(linkid,title,desc,link)
if result:
result = self.get_article(link)
else:
break
time.sleep(3600)
开发者ID:Monk-Liu,项目名称:CS,代码行数:26,代码来源:productor.py
示例19: gen_msg_token
def gen_msg_token(phone):
s = DBSession()
code = "".join(random.sample("123456789",4))
flag = False
url = "http://106.ihuyi.cn/webservice/sms.php?method=Submit&account={account}&password={password}&mobile={phone}&content={content}".format(account=account,password=password,phone=phone,content=url_escape(content.format(code=code)))
h = HTTPClient()
try:
res = h.fetch(url,connect_timeout = 5.0)
except:
flag,msg = sendTemplateSMS(phone,{code},32417)
if flag:
update_code(phone,code)
return True
else:
return msg
h.close()
root = ElementTree.fromstring(res.body.decode())
if not root[0].text == '2':
# print("[VerifyMsg]Send error:",root[0].text,root[1].text)
#如果发送失败,则改用云通讯发送
flag,msg = sendTemplateSMS(phone,{code},32417)
if flag:
update_code(phone,code)
return True
else:
return msg
else:
update_code(phone,code)
return True
开发者ID:krizex,项目名称:senguoSmh,代码行数:30,代码来源:msgverify.py
示例20: post_api
def post_api(path):
"POST command to remote API"
http = HTTPClient()
resp = http.fetch(get_url(path),
method="POST",
body='')
return json_decode(resp.body)
开发者ID:dmitriko,项目名称:sman,代码行数:7,代码来源:smanctl.py
注:本文中的tornado.httpclient.HTTPClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论