本文整理汇总了Python中requests.Session类的典型用法代码示例。如果您正苦于以下问题:Python Session类的具体用法?Python Session怎么用?Python Session使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Session类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _upload_file
def _upload_file(self, params, prepped, api_session):
# header for upload
s3_session = Session()
api_url = self._endpoint_url + "getS3url"
req = Request('POST', api_url)
upload_prepped = req.prepare()
self._encode_headers(upload_prepped.headers)
upload_prepped.headers['Content-Type'] = 'application/json'
upload_prepped.headers['User-Agent'] = self._build_user_agent_header()
self._signer.sign(upload_prepped)
# prepare params for s3 url
url_parameters = {'fileName': '', 'tenant': ''}
if 'fileName' in params and params['fileName']:
url_parameters['fileName'] = params['file_name']
elif 'fileLocation' in params and params['fileLocation']:
if os.path.isfile(params['fileLocation']):
fileName = os.path.basename(params['fileLocation'])
url_parameters['fileName'] = fileName
if 'tenant' in params and params['tenant']:
url_parameters['tenant'] = params['tenant']
# prepare the body
serializer = Serializer()
serial_obj = serializer.serialize_to_request(url_parameters, None)
upload_prepped.prepare_body(serial_obj['body'], None)
resp = s3_session.send(upload_prepped)
resp = json.loads(json.dumps(resp.json()))
# upload file to S3 bucket
if 'url' in resp and 'fileLocation' in params and params['fileLocation']:
put(resp['url'], data=open(params['fileLocation']).read())
# build upload parameters
upload_params = {'rowDelim': '', 'colDelim': '', 'headerFields': [],
'tenant': '', 'fileType': 0}
# now do actual upload
if 'tenant' in params and params['tenant']:
upload_params['tenant'] = params['tenant']
upload_params['fileLocation'] = params['fileLocation']
if os.path.isfile(params['fileLocation']):
fileName = os.path.basename(params['fileLocation'])
upload_params['fileName'] = fileName
if 'fileName' in params and params['fileName']:
upload_params['fileName'] = params['fileName']
if 'sourcePlatform' in params and params['sourcePlatform']:
upload_params['sourcePlatform'] = params['sourcePlatform']
if 'colDelim' in params and params['colDelim']:
upload_params['colDelim'] = params['colDelim']
if 'rowDelim' in params and params['rowDelim']:
upload_params['rowDelim'] = params['rowDelim']
if 'headerFields' in params and params['headerFields']:
upload_params['headerFields'] = params['headerFields']
if 'fileType' in params and params['fileType']:
upload_params['fileType'] = params['fileType']
# prepare the body
serializer = Serializer()
serial_obj = serializer.serialize_to_request(upload_params, None)
prepped.prepare_body(serial_obj['body'], None)
resp = api_session.send(prepped)
resp = json.dumps(resp.json())
return resp
开发者ID:cloudera,项目名称:hue,代码行数:60,代码来源:api_lib.py
示例2: doRequest
def doRequest(self, method, url, params=None, parse=True, data=None):
""" Send HTTP request, with given method,
credentials and data to the given URL,
and return the success and the result on success.
"""
if not self.bitbucket.auth:
raise ValueError("No auth credentials.")
if data:
data = dict(data)
else:
data = {}
r = Request(
method=method,
url=url,
auth=self.bitbucket.auth,
params=params,
data=data
)
s = Session()
resp = s.send(r.prepare())
status = resp.status_code
text = resp.text
error = resp.reason
if status >= 200 and status < 300:
if parse:
return json.loads(text)
else:
return text
else:
raise exceptions.DispatchError(text, url, error, status)
开发者ID:VRGhost,项目名称:BitBucket-api,代码行数:32,代码来源:dispatch.py
示例3: r_next_page
def r_next_page(url,plot):
next_url = url
data = {'scrollOffset':plot}
session = Session()
session.head('http://www.radionomy.com')
response = session.post(
url =url,
data=data,
headers=headers)
plot = plot + 1
match = re.compile('href="(.+?)" rel="internal"><img class="radioCover" src="(.+?)" alt="(.+?)" ').findall(str(response.text))
for url,image,title in match:
url = str(url).replace('/en/radio', 'http://listen.radionomy.com').replace('/index', '.m3u')
h = HTMLParser.HTMLParser()
try: title = h.unescape(title)
except UnicodeDecodeError:
continue
image = image.replace('s67.jpg', 's400.jpg')
try: add_directory3(title,url,140, defaultfanart ,image,plot='')
except KeyError:
continue
xbmcplugin.setContent(pluginhandle, 'songs')
add_directory2('Next Page>>', next_url, 133, defaultfanart, defaultimage, plot)
xbmc.executebuiltin("Container.SetViewMode("+str(confluence_views[6])+")")
xbmcplugin.endOfDirectory(addon_handle)
开发者ID:MetalChris,项目名称:repository.metalchris,代码行数:25,代码来源:addon.py
示例4: recognize_captcha
def recognize_captcha(self, str_image_path):
# CAPTCHA画像の読み込み
bin_captcha = open(str_image_path, 'rb').read()
# base64でCAPTCHA画像をエンコード
str_encode_file = base64.b64encode(bin_captcha).decode()
# APIのURLを指定
str_url = "https://vision.googleapis.com/v1/images:annotate?key="
# 事前に取得したAPIキー
str_api_key = ""
# Content-TypeをJSONに設定
str_headers = {'Content-Type': 'application/json'}
# Cloud Vision APIの仕様に沿ってJSONのペイロードを定義。
# CAPTCHA画像からテキストを抽出するため、typeは「TEXT_DETECTION」にする。
str_json_data = {
'requests': [
{
'image': {
'content': str_encode_file
},
'features': [
{
'type': "LABEL_DETECTION",
'maxResults': 10
}
]
}
]
}
# リクエスト送信
obj_session = Session()
obj_request = Request("POST",
str_url + str_api_key,
data=json.dumps(str_json_data),
headers=str_headers
)
obj_prepped = obj_session.prepare_request(obj_request)
obj_response = obj_session.send(obj_prepped,
verify=True,
timeout=60
)
# 分析結果の取得
if obj_response.status_code == 200:
#logging
logging.basicConfig(filename='example.log', level=logging.DEBUG)
logging.debug(obj_response.text)
#1番目のみを取得
obj = obj_response.text
keywords = json.loads(obj)
keyword = keywords['responses'][0]['labelAnnotations'][0]['description']
print("Tag:" + keyword)
return(keyword)
else:
return "error"
开发者ID:jun-oka,项目名称:speed-nikki,代码行数:60,代码来源:GetTagFromImage.py
示例5: getReferer
def getReferer(url, referer):
useragent = (
"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.1.14) Gecko/20080418 Ubuntu/7.10 (gutsy) Firefox/2.0.0.14"
)
session = Session()
session.headers.update({"referer": referer, "user-agent": useragent})
return session.get(url)
开发者ID:HoffmannP,项目名称:BASH-scripts-an-stuffalike,代码行数:7,代码来源:wallpaper.py
示例6: main
def main():
for user in create_login_payload():
logging.info('Opening a Grafana session...')
session = Session()
login(session, user)
if check_initialized(session):
logging.info('Grafana has already been initialized, skipping!')
return
logging.info('Attempting to add configured datasource...')
r = session.post('{url}/api/datasources'.format(url=GRAFANA_URL),
json=create_datasource_payload())
logging.debug('Response: %r', r.json())
r.raise_for_status()
for path in sorted(glob.glob('{dir}/*.json'.format(dir=DASHBOARDS_DIR))):
logging.info('Creating dashboard from file: {path}'.format(path=path))
r = session.post('{url}/api/dashboards/db'.format(url=GRAFANA_URL),
json=create_dashboard_payload(path))
logging.debug('Response: %r', r.json())
r.raise_for_status()
logging.info('Ending %r session...', user.get('user'))
session.get('{url}/logout'.format(url=GRAFANA_URL))
logging.info('Finished successfully.')
开发者ID:hpcloud-mon,项目名称:monasca-docker,代码行数:27,代码来源:grafana.py
示例7: __init__
class Site:
def __init__(self, username, password):
self.username = username
self.password = password
self.session = None
def url(self):
return "http://{}/collection/all".format(HOST)
def login(self):
self.session = Session()
# drupal requires that you first GET the form
r = self.session.get(self.url())
# then POST to it
s = self.session.post(
self.url(), data={
'name': self.username, 'pass': self.password,
'form_id': 'user_login',
'op': 'Log in',
},
headers={
'referer': self.url(),
}
)
print("=== logged in ===")
return self.session
def get_session(self):
if self.session is not None:
return self.session
self.session = self.login()
return self.session
def get_collection_page(self, page):
return CollectionPage(self.session, page)
开发者ID:ccnmtl,项目名称:digitaltibet,代码行数:35,代码来源:scrape.py
示例8: update_test
def update_test(is_crash=0):
config = merge_config()
sess = Session()
url = "{}/project/td".format(config['tbd_server'])
post_params={
'host_name': socket.gethostname(),
'ip_addr': IP_ADDR,
'tc_name': config['tc_name'],
'test_client': config['monitor_client'],
'build_verion': config['build_version'],
'is_crash': is_crash,
'ta_name': None,
'tc_result': None,
'ta_result': None,
}
resp = sess.post(url, data=post_params)
log.debug("Post to {} with parameters {}".format(url, post_params))
try:
result = resp.json()
if result['code']:
log.error("update test data failed: {}!".format(result['result']))
else:
result = result['result']
log.debug("update test data successfully: {}".format(result))
except Exception as err:
log.error("Failed to parse json result: {}!".format(err))
开发者ID:brucepei,项目名称:plat,代码行数:26,代码来源:TC_base.py
示例9: main
def main():
session = Session()
marketReq = showOrderbookCompactRequest()
ratesReq = showRatesRequest()
market = Market(session.send(marketReq).json())
rates = session.send(ratesReq).json()
course = Course(rates)
print "---------------------------------------------"
print "\t[*] Asks [*]"
print "\tvolume: %f" % market.get_BidVolume()
print "\tmedian price: %f" % market.get_BidMedian()
print "\tavg price: %f" % market.get_BidAverage()
print "\tcheapest: %f" % market.get_BidLowest()
print "\tmost expensive: %f" % market.get_BidHighest()
print "---------------------------------------------"
print "\t[*] Bid [*]"
print "\tvolume: %f" % market.get_AskVolume()
print "\tmedian price: %f" % market.get_AskMedian()
print "\tavg price: %f" % market.get_AskAverage()
print "\thighest bid: %f" % market.get_AskHighest()
print "\tlowest bid: %f" % market.get_AskLowest()
print "---------------------------------------------"
print "\t[*] courses [*]"
print "\texact: %f" % course.getCurrentRate()
print "\t12 Hours: %f" % course.get12hWeighted()
print "\t3 hours: %f" % course.get3hWeighted()
开发者ID:blanky0230,项目名称:python_bitcoin.de_client,代码行数:27,代码来源:btcde.py
示例10: login
def login(username, passwd, url=None):
"""Login
>>> login('6102114000', '000000') # doctest: +ELLIPSIS
<requests.sessions.Session object at 0x...>
"""
sesion = Session()
data = {}
data['USERNAME'] = username
data['PASSWORD'] = passwd
data['useDogCode'] = ''
data['x'] = 37
data['y'] = 8
ip = LOGIN_HOST [int(username)%3]
try:
res = sesion.post(ip % ('Logon.do?method=logon'), data=data, timeout=TIME_OUT)
res = sesion.post(ip % ('Logon.do?method=logonBySSO'), timeout=TIME_OUT)
if res.ok:
return sesion
else:
return WRONG_USENAME
except RequestException as error:
return CONECTION_ERROR
开发者ID:mrzhangboss,项目名称:test,代码行数:25,代码来源:main.py
示例11: get_url_page
def get_url_page(self, url="http://www.optimalstackfacts.org/"):
for l in xrange(3):
# proxies_url = choice(self.proxies_url_list)
proxies_url = "http://82.209.49.200:8080"
proxies = {
# "http": "http://eric316:[email protected]:80/",
"http": proxies_url,
"https": proxies_url
}
try:
session = Session()
r = session.get(url, proxies=proxies, headers=self.headers, timeout=10)
# r = requests.get(url, proxies=proxies,)
print r.status_code
if r.status_code in [200, 301]:
page = r.content
r.cookies.clear()
r.close()
return page
else:
r.cookies.clear()
r.close()
except:
pass
开发者ID:codedis213,项目名称:avv_work,代码行数:32,代码来源:code_optimalstackfacts.py
示例12: validate
def validate(self):
"""Run validation using HTTP requests against validation host
Using rules provided by spec, perform requests against validation host
for each rule. Request response is verified to match the spec respsonse
rule. This will yield either a :py:cls:`ValidationPass` or
:py:cls:`ValidationFail` response.
"""
session = Session()
if not self.verify and hasattr(urllib3, 'disable_warnings'):
urllib3.disable_warnings()
for rule in self.spec.get_rules():
req = rule.get_request(self.host, self.port)
if self.debug:
pprint.pprint(req.__dict__)
try:
resp = session.send(req.prepare(), allow_redirects=False,
verify=self.verify)
if self.debug:
pprint.pprint(resp.__dict__)
if rule.matches(resp):
yield ValidationPass(rule=rule, request=req, response=resp)
except (ConnectionError, SSLError) as exc:
# No response yet
yield ValidationFail(rule=rule, request=req, response=None,
error=exc)
except ValidationError as exc:
# Response received, validation error
yield ValidationFail(rule=rule, request=req, response=resp,
error=exc)
开发者ID:agjohnson,项目名称:validatehttp,代码行数:30,代码来源:validate.py
示例13: get_http_client
def get_http_client():
'''
'''
client = Session()
client.trust_env = False # https://github.com/kennethreitz/requests/issues/2066
return client
开发者ID:BforBen,项目名称:city-analytics-dashboard-setup,代码行数:7,代码来源:__init__.py
示例14: AppStatsClient
class AppStatsClient(object):
limit = 100 # records
interval = 60 # seconds
timeout = 1 # timeout in seconds to submit data
def __init__(self, url, app_id):
self.url = url
self.app_id = app_id
self._acc = defaultdict(Counter)
self._last_sent = time()
self._session = Session(
headers = {'Content-Type': 'application/json'},
timeout = self.timeout,
)
self._req_count = 0
def add(self, name, counts):
with lock:
self._acc[name]['NUMBER'] += 1
for counter in counts:
self._acc[name][counter] += counts[counter]
self._req_count += 1
elapsed = time() - self._last_sent
if elapsed >= self.interval or self._req_count >= self.limit:
self.submit()
def submit(self):
data = json.dumps({self.app_id: self._acc})
try:
self._session.post(self.url, data=data)
except RequestException, e:
log.debug('Error during data submission: %s' % e)
else:
开发者ID:uvNikita,项目名称:appstats,代码行数:34,代码来源:appstats_client.py
示例15: main4
def main4():
# http://stackoverflow.com/questions/10247054/http-post-and-get-with-cookies-for-authentication-in-python
# auth_url3 = "https://sso-platform.syncapse-staging.com/ajax_request/[email protected]&password=Testing."
# response_url = "https://manage.syncapse-staging.com/posts/responses"
s = requests.Session()
r1 = s.get(auth_url3)
print r1.headers
# working
headers = {"Cookie": r1.headers["set-cookie"] + " ; company_guid=6f65b34d-b6f4-4abd-b14a-408b8a11029b"}
s1 = Session()
prepped = Request(
"GET", # or any other method, 'POST', 'PUT', etc.
response_url,
# data=data,
headers=headers
# ...
).prepare()
resp = s1.send(prepped)
WriteToFile("/home/rtandon/Downloads/Work/importdata/TCCC_Response/Response_Time_Json_Sample3_main4.txt", resp.text)
# print resp.text
print "done"
开发者ID:rtandon,项目名称:apps,代码行数:26,代码来源:http_call.py
示例16: __send_request
def __send_request(self, url, data=None, method='GET', isKeyRequired=True):
if isKeyRequired and not self._key:
raise Exception('API Key is required. Get the API key from http://rimuhosting.com/cp/apikeys.jsp. Then export RIMUHOSTING_APIKEY=xxxx (the digits only) or add RIMUHOSTING_APIKEY=xxxx to a ~/.rimuhosting file.')
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
if isKeyRequired:
headers['Authorization']= "rimuhosting apikey=%s" % self._key
url = urllib.parse.urljoin(self._base_url, url)
data = data if isinstance(data, str) else json.dumps(data)
s = Session()
req = Request(method, url,
data=data,
headers=headers
)
prepped = s.prepare_request(req)
resp = s.send(prepped, timeout=3600)
if not resp.ok:
message = resp.text
try:
j2 = resp.json()
for val in j2:
if "error_info" in j2[val] and "human_readable_message" in j2[val]["error_info"]:
message = j2[val]["error_info"]["human_readable_message"]
break
finally:
raise Exception(resp.status_code, resp.reason, message)
return resp
开发者ID:pbkwee,项目名称:RimuHostingAPI,代码行数:33,代码来源:rimuapi.py
示例17: get_response
def get_response(url, **kwargs):
header_info = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/44.0.2403.157 Safari/537.36'
}
if 'retries' in kwargs:
retries = kwargs.pop('retries')
kwargs['headers'] = header_info
else:
retries = 3
if 'sess' in kwargs:
sess = kwargs.pop('sess')
else:
sess = Session()
if 'timeout' not in kwargs:
kwargs['timeout'] = 10
response = None
try:
response = sess.get(url, **kwargs)
except Timeout, e:
if retries > 0:
kwargs['retries'] = retries - 1
kwargs['sess'] = sess
response = get_response(**kwargs)
else:
print e
开发者ID:yangwe1,项目名称:movie_spider,代码行数:26,代码来源:retry.py
示例18: get_request
def get_request(url,header):
req = Request(method="GET",url=url,headers=header)
req_prepared = req.prepare()
res = Response()
s = Session()
res = s.send(req_prepared)
return res
开发者ID:shwetams,项目名称:python_azure_tables,代码行数:7,代码来源:base_rest_calls.py
示例19: _get_response
def _get_response(self, content_type, url, headers, file_descriptor):
s = Session()
response = None
req = Request(content_type, url, headers=headers, data=file_descriptor)
prepared = req.prepare()
try:
response = s.send(prepared)
except exceptions.Timeout:
raise
except exceptions.TooManyRedirects:
raise
except exceptions.RequestException:
raise
if response.status_code != requests.codes.ok:
try:
raise BackblazeException(response.status_code,
response.json()['message'],
response,
headers)
except ValueError:
raise BackblazeException(response.status_code,
response.text,
response,
headers)
return response
开发者ID:th3architect,项目名称:oio-sds,代码行数:25,代码来源:backblaze_http.py
示例20: handle
def handle(self, *args, **options):
s = Session()
if len(args) < 2:
print "Usage: replay_error <host:port> <portable request>"
sys.exit(1)
# self.request('GET', '{base_url}/{url}'.format(
# base_url=self.base_url.rstrip('/'),
# url=url.lstrip('/'),
# ),
# params=params or {},
# headers=self.base_headers,
# cookies={
# 'token': self.token or '',
# }
portable = json.loads(base64.b64decode(args[1]))
req = Request(
portable['method'],
'http://{host}{path}'.format(
host=args[0],
path=portable['path'],
),
params=portable['get'],
data=portable['post'],
cookies=portable['cookies'],
).prepare()
res = s.send(req)
print res.content
开发者ID:DjangoAdminHackers,项目名称:django-db-log,代码行数:29,代码来源:replay_error.py
注:本文中的requests.Session类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论