本文整理汇总了Python中requests.session函数的典型用法代码示例。如果您正苦于以下问题:Python session函数的具体用法?Python session怎么用?Python session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: loadCitiesFromOsm
def loadCitiesFromOsm():
"""
Load all cities from OpenStreetMap, use bounding box for position
:return: rendered template
"""
global URL
map_details = Map.getMaps()[0].getMapBox(tilepath=current_app.config.get('PATH_TILES'))
SEARCHSTRING = 'rel[boundary=administrative](%s,%s,%s,%s);out;' % (map_details['min_latdeg'], map_details['min_lngdeg'], map_details['max_latdeg'], map_details['max_lngdeg']) # search all cities in bounding box
requests.session()
r = requests.post(URL, data={'data': SEARCHSTRING})
xmldoc = minidom.parseString(r.content)
relations = xmldoc.getElementsByTagName('relation')
osmids = [c.osmid for c in City.getCities()]
cities = []
for relation in relations:
for tag in relation.childNodes:
if tag.nodeName == "tag" and tag.attributes['k'].value == 'name':
cities.append([relation.attributes['id'].value, tag.attributes['v'].value, int(relation.attributes['id'].value) in osmids])
cities.sort(lambda x, y: cmp(x[1], y[1]))
return render_template('admin.streets.city_osm.html', cities=cities)
开发者ID:FFFeldkirchenWesterham,项目名称:eMonitor,代码行数:25,代码来源:city_utils.py
示例2: __init__
def __init__(self, url, username=None, password=None, params=None, headers=None,
cookies=None, timeout=None, session=None):
self._url = url
self._auth = ()
if username is not None:
self._auth = SimpleAuth(username=username)
if password is not None:
self._auth = SimpleAuth(username=username, password=password)
self._timeout = timeout
self._params = default(params, {})
self._headers = default(headers, {})
self._cookies = default(cookies, {})
if session is None:
# create request session object and try to auto detect proper ssl version
self._session = requests.session()
if self._url.startswith('https://'):
SSLVer = None
for SSLVer in SSLVers:
try:
self._session = requests.session()
self._session.mount('https://', SSLAdapter(SSLVer))
self._session.get(self._url)
break
except requests.exceptions.SSLError:
continue
logger.debug('Detected SSL Version: %s' % SSLVersStr[SSLVer])
else:
self._session = session
开发者ID:sedlacek,项目名称:jenkinsapi-light,代码行数:30,代码来源:requester.py
示例3: __init__
def __init__(self, *args, **kwargs):
super(Carrier, self).__init__(*args, **kwargs)
# Get configuration
self.config = Config(os.getcwd())
self.config.from_object(defaults)
if "CARRIER_CONF" in os.environ:
self.config.from_envvar("CARRIER_CONF")
# Initalize app
logging.config.dictConfig(self.config["LOGGING"])
store = redis.StrictRedis(**dict([(k.lower(), v) for k, v in self.config["REDIS"].items()]))
wsession = requests.session(
auth=(
self.config["WAREHOUSE_AUTH"]["USERNAME"],
self.config["WAREHOUSE_AUTH"]["PASSWORD"],
),
headers={"User-Agent": user_agent()},
)
warehouse = forklift.Forklift(session=wsession)
warehouse.url = self.config["WAREHOUSE_URI"]
psession = requests.session(verify=self.config["PYPI_SSL_VERIFY"], headers={"User-Agent": user_agent()})
ptransports = [xmlrpc2.client.HTTPTransport(session=psession), xmlrpc2.client.HTTPSTransport(session=psession)]
pypi = xmlrpc2.client.Client(self.config["PYPI_URI"], transports=ptransports)
self.processor = Processor(warehouse, pypi, store)
开发者ID:crateio,项目名称:carrier,代码行数:30,代码来源:core.py
示例4: test_login_api
def test_login_api ():
# create account with fb
with session() as c:
d = { "id" : '123456',
"first_name" : 'Frëd',
"last_name" : 'Low',
"email" : '[email protected]',
"location" : 'Mountain View, CA' }
headers = {'content-type': 'application/json; charset=utf-8json'}
request = c.post('%s/signup/fb' % HOST, data=json.dumps(d), headers=headers)
assert request.content == '{"status": "new"}'
with session() as c:
d = { "id" : "123456",
"first_name" : "Frëd",
"last_name" : "Low",
"email" : "[email protected]",
"location" : "Mountain View, CA"}
headers = {'content-type': '; charset=utf-8'}
request = c.post('%s/signup/fb' % HOST, data=json.dumps(d), headers=headers)
assert request.content == '{"status": "new"}'
with session() as c:
d = { "id" : "123",
"first_name" : "Frëd",
"last_name" : "Low",
"email" : "[email protected]",
"location" : "Mountain View, CA"}
headers = {'content-type': '; charset=utf-8json'}
request = c.post('%s/signup/fb' % HOST, data=json.dumps(d), headers=headers)
assert request.content == '{"status": "new"}'
开发者ID:elaineo,项目名称:barnacle-gae,代码行数:31,代码来源:test_main.py
示例5: interfaceTest
def interfaceTest(num, api_purpose, api_host, request_method,request_data_type, request_data, check_point, s=None):
headers = {'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With' : 'XMLHttpRequest',
'Connection' : 'keep-alive',
'Referer' : 'http://' + api_host,
'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36'
}
if s == None:
s = requests.session()
if request_method == 'POST':
if request_url != '/login' :
r = s.post(url='http://'+api_host+request_url, data = json.loads(request_data), headers = headers) #由于此处数据没有经过加密,所以需要把Json格式字符串解码转换成**Python对象**
elif request_url == '/login' :
s = requests.session()
r = s.post(url='http://'+api_host+request_url, data = request_data, headers = headers) #由于登录密码不能明文传输,采用MD5加密,在之前的代码中已经进行过json.loads()转换,所以此处不需要解码
else:
logging.error(num + ' ' + api_purpose + ' HTTP请求方法错误,请确认[Request Method]字段是否正确!!!')
s = None
return 400, resp, s
status = r.status_code
resp = r.text
print resp
if status == 200 :
if re.search(check_point, str(r.text)):
logging.info(num + ' ' + api_purpose + ' 成功,' + str(status) + ', ' + str(r.text))
return status, resp, s
else:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '], ' + str(r.text))
return 200, resp , None
else:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '],' + str(r.text))
return status, resp.decode('utf-8'), None
开发者ID:huyq119,项目名称:inferfacetesting,代码行数:33,代码来源:emailconfig.py
示例6: __init__
def __init__(self, api_key=None, api_secret=None, oauth_token=None, \
oauth_token_secret=None, callback_url='', headers=None):
self.api_key = api_key and u'%s' % api_key
self.api_secret = api_secret and u'%s' % api_secret
self.oauth_token = oauth_token and u'%s' % oauth_token
self.oauth_token_secret = oauth_token_secret and u'%s' % oauth_token_secret
self.callback_url = callback_url
self.request_token_url = 'http://api.netflix.com/oauth/request_token'
self.access_token_url = 'http://api.netflix.com/oauth/access_token'
self.authorize_url = 'https://api-user.netflix.com/oauth/login'
self.old_api_base = 'http://api.netflix.com/'
self.api_base = 'http://api-public.netflix.com/'
default_headers = {'User-agent': 'Python-Netflix v%s' % __version__}
self.headers = default_headers.update(headers or {})
self.client = requests.session(headers=self.headers)
self.auth = None
if self.api_key is not None and self.api_secret is not None:
self.auth = OAuth1(self.api_key, self.api_secret,
signature_type='auth_header')
if self.oauth_token is not None and self.oauth_token_secret is not None:
self.auth = OAuth1(self.api_key, self.api_secret,
self.oauth_token, self.oauth_token_secret,
signature_type='auth_header')
if self.auth is not None:
self.client = requests.session(headers=self.headers, auth=self.auth)
开发者ID:funkatron,项目名称:python-netflix,代码行数:33,代码来源:netflix.py
示例7: fetch_production
def fetch_production(country_code='BO', session=None):
#Define actual and last day (for midnight data)
now = arrow.now(tz=tz_bo)
formatted_date = now.format('YYYY-MM-DD')
past_formatted_date = arrow.get(formatted_date, 'YYYY-MM-DD').shift(days=-1).format('YYYY-MM-DD')
#Define output frame
actual_hour = now.hour
data = [dict() for h in range(actual_hour+1)]
#initial path for url to request
url_init = 'http://www.cndc.bo/media/archivos/graf/gene_hora/despacho_diario.php?fechag='
#Start with data for midnight
url = url_init + past_formatted_date
#Request and rearange in DF
r = session or requests.session()
response = r.get(url)
obj = webparser(response)
data_temp = fetch_hourly_production(country_code, obj, 0, formatted_date)
data[0] = data_temp
#Fill data for the other hours until actual hour
if actual_hour>1:
url = url_init + formatted_date
#Request and rearange in DF
r = session or requests.session()
response = r.get(url)
obj = webparser(response)
for h in range(1, actual_hour+1):
data_temp = fetch_hourly_production(country_code, obj, h, formatted_date)
data[h] = data_temp
return data
开发者ID:alfurey,项目名称:electricitymap,代码行数:34,代码来源:BO.py
示例8: test_three_legged_auth
def test_three_legged_auth(self):
yes_or_no = raw_input("Do you want to skip Imgur three legged auth test? (y/n):")
if yes_or_no.lower() in ['y', 'yes']:
return
for header_auth in (True, False):
# Step 1: Obtaining a request token
imgur_oauth_hook = OAuthHook(
consumer_key=IMGUR_CONSUMER_KEY,
consumer_secret=IMGUR_CONSUMER_SECRET,
header_auth=header_auth
)
client = requests.session(hooks={'pre_request': imgur_oauth_hook})
response = client.post('http://api.imgur.com/oauth/request_token')
qs = parse_qs(response.text)
oauth_token = qs['oauth_token'][0]
oauth_secret = qs['oauth_token_secret'][0]
# Step 2: Redirecting the user
print "Go to http://api.imgur.com/oauth/authorize?oauth_token=%s and sign in into the application, then enter your PIN" % oauth_token
oauth_verifier = raw_input('Please enter your PIN:')
# Step 3: Authenticate
new_imgur_oauth_hook = OAuthHook(oauth_token, oauth_secret, IMGUR_CONSUMER_KEY, IMGUR_CONSUMER_SECRET, header_auth)
new_client = requests.session(
hooks={'pre_request': new_imgur_oauth_hook}
)
response = new_client.post('http://api.imgur.com/oauth/access_token', {'oauth_verifier': oauth_verifier})
response = parse_qs(response.content)
token = response['oauth_token'][0]
token_secret = response['oauth_token_secret'][0]
self.assertTrue(token)
self.assertTrue(token_secret)
开发者ID:XuChen-Repair,项目名称:Trendspedia2,代码行数:34,代码来源:tests.py
示例9: interfaceTest
def interfaceTest(num, api_purpose, api_host, request_method, request_data_type, request_data, check_point, s=None):
headers = {'content-type': 'application/json',''
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko',
'Accept':'application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, */*',
'Accept-Language':'zh-CN'}
if s == None:
s = requests.session()
if request_method == 'POST':
if request_url != '/login' :
r = s.post(url='http://'+api_host+request_url, data = json.loads(request_data), headers = headers) #由于此处数据没有经过加密,所以需要把Json格式字符串解码转换成**Python对象**
elif request_url == '/login' :
s = requests.session()
r = s.post(url='http://'+api_host+request_url, data = request_data, headers = headers) #由于登录密码不能明文传输,采用MD5加密,在之前的代码中已经进行过json.loads()转换,所以此处不需要解码
else:
logging.error(num + ' ' + api_purpose + ' HTTP请求方法错误,请确认[Request Method]字段是否正确!!!')
s = None
return 400
status = r.status_code
resp = r.text
print resp
if status == 200 :
if re.search(check_point, str(r.text)):
logging.info(num + ' ' + api_purpose + ' 成功,' + str(status) + ', ' + str(r.text))
return status, resp, s
else:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '], ' + str(r.text))
return 200, resp , None
else:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '],' + str(r.text))
return status, resp.decode('utf-8'), None
开发者ID:Tester-smj,项目名称:selenium,代码行数:30,代码来源:testcase.py
示例10: interfaceTest
def interfaceTest(num, api_purpose, api_host, request_method, request_data_type, request_data, check_point, s=None):
headers = {'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With' : 'XMLHttpRequest',
'Connection' : 'keep-alive',
'Referer' : 'http://' + api_host,
'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) \ppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.110 Safari/537.36'
}
if s == None:
s = requests.session()
if request_method == 'POST':
if request_url != '/login' :
r = s.post(url='http://'+api_host+request_url, \
data = json.loads(request_data), headers = headers) #data has no encryption,so need decode Json charactor change to python objects.
elif request_url == '/login' :
s = requests.session()
r = s.post(url='http://'+api_host+request_url, \
data = request_data, headers = headers) #used MD5 encryption,the before codes have been changed to json.loads(),so no need to encryption.
else:
logging.error(num + ' ' + api_purpose + ' HTTP请求方法错误,请确认[Request Method]字段是否正确!!!')
s = None
return 400, resp, s
status = r.status_code
resp = r.text
print resp
if status == 200 :
if re.search(check_point, str(r.text)):
logging.info(num + ' ' + api_purpose + ' 成功,' + str(status) + ', ' + str(r.text))
return status, resp, s
elif:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '], ' + str(r.text))
return 200, resp , None
else:
logging.error(num + ' ' + api_purpose + ' 失败!!!,[' + str(status) + '],' + str(r.text))
return status, resp.decode('utf-8'), None
开发者ID:Birkhoff001,项目名称:app_NotAlone,代码行数:35,代码来源:http_interface.py
示例11: login
def login():
status_code = 0
username = raw_input('Enter username:')
password = raw_input('Enter password:')
client = requests.session()
client.get('http://' + base + '/file_demo/login/')
csrf = client.cookies['csrftoken']
credentials = {'username': username, 'password': password}
header = {'X-CSRFToken': csrf}
web = client.post('http://' + base + '/file_demo/login/', data=credentials, headers=header)
status_code = web.status_code
secure_cookie = web.cookies
while status_code !=200:
print "incorrect combo!"
username = raw_input('Enter username:')
password = raw_input('Enter password:')
client = requests.session()
client.get('http://'+ base +'/file_demo/login/')
csrf = client.cookies['csrftoken']
credentials = {'username': username, 'password': password}
header = {'X-CSRFToken': csrf}
web = client.post('http://'+ base +'/file_demo/login/', data=credentials, headers=header)
status_code = web.status_code
secure_cookie = web.cookies
return secure_cookie
开发者ID:anatg,项目名称:OneDir,代码行数:25,代码来源:client.py
示例12: _request
def _request(self, url, method = u"get", data = None, headers=None, **kwargs):
"""
does the request via requests
- oauth not implemented yet
- use basic auth please
"""
# if self.access_token:
# auth_header = {
# u"Authorization": "Bearer %s" % (self.access_token)
# }
# headers.update(auth_header)
#basic auth
msg = "method: %s url:%s\nheaders:%s\ndata:%s" % (
method,url,headers,data)
#print msg
if not self.use_oauth:
auth=(self.sk_user, self.sk_pw)
if not self.client:
self.client = requests.session()
r = self.client.request(method, url, headers=headers, data=data, auth=auth,**kwargs)
else:
if not self.client:
self.client = requests.session(hooks={'pre_request': oauth_hook})
r = self.client.request(method, url, headers=headers, data=data,**kwargs)
return r
开发者ID:cloudControl,项目名称:salesking_python_sdk,代码行数:25,代码来源:api.py
示例13: dump_data_from_api
def dump_data_from_api(start_id, finish_id):
req = "http://api.wotblitz.ru/wotb/account/info/?application_id={}&fields=nickname&account_id={}"
filename = "nicknames_dump_" +str(start_id) + "_" + str(finish_id)
f = open(filename, 'a')
S = requests.session()
for i in range((finish_id - start_id) // 100):
if i % 10 == 0:
logging.critical("current start_id: {}".format(str(start_id + i*100)))
account_ids_list = []
for account_id in range(start_id + i*100, start_id + (i+1)*100):
account_ids_list.append(str(account_id))
full_req = req.format(config.wargaming_id, ",".join(account_ids_list))
# with eventlet.Timeout(30):
response = S.get(full_req, timeout=30).json()
try:
nicknames = extract_nickname_from_response(response)
except SourceNotAvailableException:
logging.error("Caught SOURCE_NOT_AVAILABLE, start_id + i*100 = " + str(start_id + i*100))
S.close()
time.sleep(1)
S = requests.session()
response = S.get(full_req, timeout=30).json()
nicknames = extract_nickname_from_response(response)
for i in nicknames:
f.write(i+"\n")
f.close()
开发者ID:anatolyburtsev,项目名称:wg_dump_api,代码行数:29,代码来源:wg_api.py
示例14: getuserstatus
def getuserstatus(session=''):
status = 'Guest'
user1 = 'Guest'
if session == '':
session = requests.session()
with open('cookies') as f:
cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
session = requests.session()
session.cookies = cookies
del session.cookies['c_visitor']
#print session.cookies #session = requests.session()
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0',
'Connection': 'keep-alive'}
site = session.get('https://www.crunchyroll.com/acct/membership/', headers=headers, verify=True).text
#open('tempfile','w').write(site).encoding('UTF-8')
#print site.encode('utf-8')
if re.search(re.escape(' ga(\'set\', \'dimension5\', \'registered\');'), site):
status = 'Free Member'
elif re.search(re.escape(' ga(\'set\', \'dimension5\', \'premium\');'), site):
if re.search(re.escape(' ga(\'set\', \'dimension6\', \'premiumplus\');'), site):
status = 'Premium+ Member'
else:
status = 'Premium Member'
if status != 'Guest':
user1 = re.findall('<a href=\"/user/(.+)\" ', site).pop()
return [status,user1]
开发者ID:Flwk,项目名称:crunchy-xml-decoder,代码行数:26,代码来源:login.py
示例15: __download
def __download(self, q, url):
request = requests.session()
ID = self.__getTrakeId(url)
fullurl = "http://api.soundcloud.com/i1/tracks/{0}/streams?client_id=b45b1aa10f1ac2941910a7f0d10f8e28&app_version=8bae64e".format(
ID)
response = request.get(fullurl).text
j = json.loads(response)
link = j["http_mp3_128_url"]
if link is not None:
url = link
else:
raise Exception("Failed to get download link")
request = requests.session()
response = request.get(url, stream=True)
a, c = self.__getTrackInfo(ID)
filename = c + ".mp3"
with open(filename, 'wb') as fd:
total_length = int(response.headers.get('content-length')) # taken from http://stackoverflow.com/a/20943461
for chunk in progress.bar(response.iter_content(chunk_size=1024), expected_size=(total_length / 1024)):
if chunk:
fd.write(chunk)
fd.flush()
filename = [filename, a, c]
self.__addtags(filename)
q.task_done()
开发者ID:KingBARD,项目名称:soundDown,代码行数:34,代码来源:soundDown.py
示例16: handle_auth
def handle_auth(self):
""" Handles test authentication. """
login = self.login_options
if self.login_type == models_base.LoginType.NONE:
session = requests.session()
elif self.login_type == models_base.LoginType.BASIC:
session = requests.session(auth=(login['user'], login['password']))
elif self.login_type == models_base.LoginType.COOKIE:
session = requests.session()
params = {login['login_field']: login['login'],
login['password_field']: login['password']}
session.post(self.login_info['url'], params=params)
elif self.login_type == models_base.LoginType.OAUTH:
params = dict(access_token=login['access_token'],
access_token_secret=login['access_token_secret'],
consumer_key=login['consumer_key'],
consumer_secret=login['consumer_secret'],
header_auth=login.get('header_auth', True))
oauth_hook = OAuthHook(**params)
session = requests.session(hooks={'pre_request': oauth_hook})
logger.info('Attaching OAuth info.')
else:
raise NotImplementedError('%s login type not implemented' %
self.login_type)
return session
开发者ID:elegion,项目名称:djangodash2012,代码行数:25,代码来源:models.py
示例17: newSession
def newSession():
""" Returns a new Requests session with pre-loaded default HTTP Headers
Generates a new Requests session and consults with the Configuration class
to determine if a Configuration exists and attempts to use the configured
HTTP Request headers first. If this fails, it attempts to create a new
default configuration and use those values. Finally, if a configuration
cannot be initiaized it uses the hard-coded Mozilla headers.
Returns
request-client - The configured Requests session
Raises
HTTPException
"""
from neolib.config.Configuration import Configuration
s = requests.session()
if not Configuration.loaded():
if not Configuration.initialize():
s.headers.update(Page._defaultVars)
else:
s.headers.update(Configuration.getConfig().core.HTTPHeaders.toDict())
else:
s.headers.update(Configuration.getConfig().core.HTTPHeaders.toDict())
return requests.session()
开发者ID:JDongian,项目名称:Neolib,代码行数:27,代码来源:Page.py
示例18: __init__
def __init__(self, app_key=None, app_secret=None, oauth_token=None, oauth_token_secret=None, \
headers=None, callback_url=None, twitter_token=None, twitter_secret=None, proxies=None, version='1.1'):
"""Instantiates an instance of Twython. Takes optional parameters for authentication and such (see below).
:param app_key: (optional) Your applications key
:param app_secret: (optional) Your applications secret key
:param oauth_token: (optional) Used with oauth_token_secret to make authenticated calls
:param oauth_token_secret: (optional) Used with oauth_token to make authenticated calls
:param headers: (optional) Custom headers to send along with the request
:param callback_url: (optional) If set, will overwrite the callback url set in your application
:param proxies: (optional) A dictionary of proxies, for example {"http":"proxy.example.org:8080", "https":"proxy.example.org:8081"}.
"""
# Needed for hitting that there API.
self.api_version = version
self.api_url = 'https://api.twitter.com/%s'
self.request_token_url = self.api_url % 'oauth/request_token'
self.access_token_url = self.api_url % 'oauth/access_token'
self.authorize_url = self.api_url % 'oauth/authorize'
self.authenticate_url = self.api_url % 'oauth/authenticate'
# Enforce unicode on keys and secrets
self.app_key = app_key and unicode(app_key) or twitter_token and unicode(twitter_token)
self.app_secret = app_key and unicode(app_secret) or twitter_secret and unicode(twitter_secret)
self.oauth_token = oauth_token and u'%s' % oauth_token
self.oauth_token_secret = oauth_token_secret and u'%s' % oauth_token_secret
self.callback_url = callback_url
# If there's headers, set them, otherwise be an embarassing parent for their own good.
self.headers = headers or {'User-Agent': 'Twython v' + __version__}
# Allow for unauthenticated requests
self.client = requests.session(proxies=proxies)
self.auth = None
if self.app_key is not None and self.app_secret is not None and \
self.oauth_token is None and self.oauth_token_secret is None:
self.auth = OAuth1(self.app_key, self.app_secret,
signature_type='auth_header')
if self.app_key is not None and self.app_secret is not None and \
self.oauth_token is not None and self.oauth_token_secret is not None:
self.auth = OAuth1(self.app_key, self.app_secret,
self.oauth_token, self.oauth_token_secret,
signature_type='auth_header')
if self.auth is not None:
self.client = requests.session(headers=self.headers, auth=self.auth, proxies=proxies)
# register available funcs to allow listing name when debugging.
def setFunc(key):
return lambda **kwargs: self._constructFunc(key, **kwargs)
for key in api_table.keys():
self.__dict__[key] = setFunc(key)
# create stash for last call intel
self._last_call = None
开发者ID:myusuf3,项目名称:twython,代码行数:59,代码来源:twython.py
示例19: session
def session(headers=None):
'''
Session wrapper for convenience
'''
if headers:
return requests.session(auth=AUTH, headers=headers)
else:
return requests.session(auth=AUTH)
开发者ID:freddiefisher,项目名称:GQ2Toggl,代码行数:8,代码来源:toggl_API.py
示例20: __init__
def __init__(self, twitter_token=None, twitter_secret=None, oauth_token=None, oauth_token_secret=None, \
headers=None, callback_url=None):
"""setup(self, oauth_token = None, headers = None)
Instantiates an instance of Twython. Takes optional parameters for authentication and such (see below).
Parameters:
twitter_token - Given to you when you register your application with Twitter.
twitter_secret - Given to you when you register your application with Twitter.
oauth_token - If you've gone through the authentication process and have a token for this user,
pass it in and it'll be used for all requests going forward.
oauth_token_secret - see oauth_token; it's the other half.
headers - User agent header, dictionary style ala {'User-Agent': 'Bert'}
client_args - additional arguments for HTTP client (see httplib2.Http.__init__), e.g. {'timeout': 10.0}
** Note: versioning is not currently used by search.twitter functions;
when Twitter moves their junk, it'll be supported.
"""
OAuthHook.consumer_key = twitter_token
OAuthHook.consumer_secret = twitter_secret
# Needed for hitting that there API.
self.request_token_url = 'http://twitter.com/oauth/request_token'
self.access_token_url = 'http://twitter.com/oauth/access_token'
self.authorize_url = 'http://twitter.com/oauth/authorize'
self.authenticate_url = 'http://twitter.com/oauth/authenticate'
self.twitter_token = twitter_token
self.twitter_secret = twitter_secret
self.oauth_token = oauth_token
self.oauth_secret = oauth_token_secret
self.callback_url = callback_url
# If there's headers, set them, otherwise be an embarassing parent for their own good.
self.headers = headers
if self.headers is None:
self.headers = {'User-agent': 'Twython Python Twitter Library v1.4.6'}
self.client = None
if self.twitter_token is not None and self.twitter_secret is not None:
self.client = requests.session(hooks={'pre_request': OAuthHook()})
if self.oauth_token is not None and self.oauth_secret is not None:
self.oauth_hook = OAuthHook(self.oauth_token, self.oauth_secret)
self.client = requests.session(hooks={'pre_request': self.oauth_hook})
# Filter down through the possibilities here - if they have a token, if they're first stage, etc.
if self.client is None:
# If they don't do authentication, but still want to request unprotected resources, we need an opener.
self.client = requests.session()
# register available funcs to allow listing name when debugging.
def setFunc(key):
return lambda **kwargs: self._constructFunc(key, **kwargs)
for key in api_table.keys():
self.__dict__[key] = setFunc(key)
开发者ID:airjulio,项目名称:twython,代码行数:58,代码来源:twython.py
注:本文中的requests.session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论