本文整理汇总了Python中requests.adapters.HTTPAdapter类的典型用法代码示例。如果您正苦于以下问题:Python HTTPAdapter类的具体用法?Python HTTPAdapter怎么用?Python HTTPAdapter使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HTTPAdapter类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _on_request
def _on_request(self, request, **kwargs):
match = self._find_match(request)
# TODO(dcramer): find the correct class for this
if match is None:
raise ConnectionError('Connection refused')
headers = {
'Content-Type': match['content_type'],
}
if match['adding_headers']:
headers.update(match['adding_headers'])
response = HTTPResponse(
status=match['status'],
body=StringIO(match['body']),
headers=headers,
preload_content=False,
)
adapter = HTTPAdapter()
r = adapter.build_response(request, response)
if not match['stream']:
r.content # NOQA
return r
开发者ID:jamslevy,项目名称:responses,代码行数:26,代码来源:responses.py
示例2: handle_401
def handle_401(self, response, **kwargs):
"""Takes the given response and tries digest-auth, if needed."""
original_request = response.request.copy()
www_authenticate = response.headers.get('www-authenticate', '').lower()
www_auth_schemes = [x.strip().split()[0] for x in www_authenticate.split(',') if x.strip()]
auths_to_try = [x for x in www_auth_schemes if x in [y.lower() for y in self.auth_map.keys()]]
for auth_scheme in auths_to_try:
for auth_instance in self.auth_map[auth_scheme]:
#print 'trying', auth_instance, 'for', auth_scheme
# Consume content and release the original connection
# to allow our new request to reuse the same one.
response.content
response.raw.release_conn()
prepared_request = original_request.copy()
prepared_request.hooks = default_hooks()
prepared_request.prepare_auth(auth_instance)
adapter = HTTPAdapter()
if self.session:
adapter = self.session() or adapter
new_response = adapter.send(prepared_request, **kwargs)
new_response.history.append(response)
new_response.request = prepared_request
if new_response.status_code != 401:
#print auth_instance, 'successful for', auth_scheme
self.current_auth = auth_instance
return new_response
response = new_response
return response
开发者ID:tloeb,项目名称:carteiro_deploy,代码行数:34,代码来源:transport.py
示例3: _on_request
def _on_request(self, request, **kwargs):
match = self._find_match(request)
# TODO(dcramer): find the correct class for this
if match is None:
error_msg = 'Connection refused: {0}'.format(request.url)
response = ConnectionError(error_msg)
self._calls.add(request, response)
raise response
headers = {
'Content-Type': match['content_type'],
}
if match['adding_headers']:
headers.update(match['adding_headers'])
response = HTTPResponse(
status=match['status'],
body=BufferIO(match['body']),
headers=headers,
preload_content=False,
)
adapter = HTTPAdapter()
response = adapter.build_response(request, response)
if not match['stream']:
response.content # NOQA
self._calls.add(request, response)
return response
开发者ID:l0kix2,项目名称:responses,代码行数:33,代码来源:responses.py
示例4: __init__
def __init__(self, config):
self.config = config
self.pool_manager = requests.Session()
self.retry_methods = frozenset(['GET', 'HEAD', 'DELETE', 'OPTIONS'])
# noinspection PyTypeChecker
adapter = HTTPAdapter(
pool_connections=config.http_pool_connections,
pool_maxsize=config.http_pool_size,
# max_retries=Retry(
# method_whitelist=self.retry_methods,
# total=config.http_max_retries,
# connect=config.http_max_retries,
# read=config.http_max_retries,
# status_forcelist=range(500, 600)
# ),
# pool_block=True
)
adapter.max_retries = config.http_max_retries
self.pool_manager.mount('https://', adapter)
self.pool_manager.mount('http://', adapter)
self.pool_manager.verify = bool(self.config.verify_ssl)
开发者ID:attune-api,项目名称:attune-python,代码行数:25,代码来源:rest.py
示例5: send
def send(self):
self.url = "%s%s" % (self.base_url, self.path)
prepped = self.prepare()
s = Session()
# print(self.data)
h = HTTPAdapter()
h.max_retries = 10
s.mount('http://', h)
s.mount('https://', h)
response = s.send(prepped)
response.needs_user_token = self.needs_user_token
response.original_request = self
return response
开发者ID:headphonesjones,项目名称:Blohaute,代码行数:13,代码来源:request.py
示例6: __init__
def __init__(self, app_id):
self.app_id = app_id
# Provides cookie persistence, connection-pooling, and configuration.
self.session = requests.Session()
# Create an requests HTTP adapter and set number of retries to attempt
adapter = HTTPAdapter()
adapter.max_retries = 5
# Register transport adapter for given URL prefix and enable connection retrying.
self.session.mount(self.API_URL_PREFIX, adapter=adapter)
开发者ID:danielterhorst,项目名称:open_exchange_rates,代码行数:13,代码来源:client.py
示例7: send
def send(self, request, **kwargs):
if (self._is_cache_disabled
or request.method not in self._cache_allowable_methods):
response = super(CachedSession, self).send(request, **kwargs)
response.from_cache = False
return response
cache_key = self.cache.create_key(request)
def send_request_and_cache_response():
if self._deny_outbound:
print(request.url)
raise Exception(("ERROR: OutBound communication was attempted,"
" but deny_outbound was set to True"))
cache_response = True
response = super(CachedSession, self).send(request, **kwargs)
if response.status_code in self._cache_allowable_codes:
#
# Special case for cblr:
# if we get a status of pending then don't cache
#
try:
if request.url.find('cblr') != -1 and request.method == 'GET':
if isinstance(response.json(), dict) and response.json().get('status', '') == 'pending':
cache_response = False
except:
cache_response = True
if cache_response:
self.cache.save_response(cache_key, response)
response.from_cache = False
return response
response = self.cache.get_response(cache_key)
if response is None:
return send_request_and_cache_response()
if 'Content-Encoding' in response.headers:
del response.headers['Content-Encoding']
adapter = HTTPAdapter()
response = adapter.build_response(request, response)
# dispatch hook here, because we've removed it before pickling
response.from_cache = True
response = dispatch_hook('response', request.hooks, response, **kwargs)
return response
开发者ID:RyPeck,项目名称:cbapi-python,代码行数:50,代码来源:core.py
示例8: __init__
def __init__(self, **kwargs):
super(BetamaxAdapter, self).__init__()
self.cassette = None
self.cassette_name = None
self.http_adapter = HTTPAdapter(**kwargs)
self.serialize = None
self.options = {}
开发者ID:esacteksab,项目名称:betamax,代码行数:7,代码来源:adapter.py
示例9: _on_request
def _on_request(self, request, **kwargs):
match = self._find_match(request)
# TODO(dcramer): find the correct class for this
if match is None:
error_msg = 'Connection refused: {0}'.format(request.url)
response = ConnectionError(error_msg)
self._calls.add(request, response)
raise response
if 'body' in match and isinstance(match['body'], Exception):
self._calls.add(request, match['body'])
raise match['body']
headers = {
'Content-Type': match['content_type'],
}
if 'callback' in match: # use callback
status, r_headers, body = match['callback'](request)
body = BufferIO(body.encode('utf-8'))
headers.update(r_headers)
elif 'body' in match:
if match['adding_headers']:
headers.update(match['adding_headers'])
status = match['status']
body = BufferIO(match['body'])
response = HTTPResponse(
status=status,
body=body,
headers=headers,
preload_content=False,
)
adapter = HTTPAdapter()
response = adapter.build_response(request, response)
if not match.get('stream'):
response.content # NOQA
self._calls.add(request, response)
return response
开发者ID:ryaanwells,项目名称:responses,代码行数:46,代码来源:responses.py
示例10: __init__
def __init__(self, username, password):
"""
:username - Username in 'domain\\username' format
:password - Password or hash in "ABCDABCDABCDABCD:ABCDABCDABCDABCD" format.
"""
if ntlm is None:
raise Exception("NTLM libraries unavailable")
#parse the username
user_parts = username.split('\\', 1)
self.domain = user_parts[0].upper()
self.username = user_parts[1]
self.password = password
self.adapter = HTTPAdapter()
开发者ID:0day1day,项目名称:golismero,代码行数:14,代码来源:requests_ntlm.py
示例11: __init__
def __init__(self, username, password):
"""
:username - Username in 'domain\\username' format
:password - Password or hash in "ABCDABCDABCDABCD:ABCDABCDABCDABCD" format.
"""
if ntlm is None:
raise Exception("NTLM libraries unavailable")
#parse the username
try:
self.domain, self.username = username.split('\\', 1)
except ValueError:
raise ValueError("username should be in 'domain\\username' format.")
self.domain = self.domain.upper()
self.password = password
self.adapter = HTTPAdapter()
开发者ID:njiang1987,项目名称:msichisel,代码行数:16,代码来源:requests_ntlm.py
示例12: get_connection
def get_connection(self, *args, **kwargs):
conn = HTTPAdapter.get_connection(self, *args, **kwargs)
# Override the urlopen method on this connection
if not hasattr(conn.urlopen, "wrapped"):
orig_urlopen = conn.urlopen
def urlopen(*args, **kwargs):
timeout = kwargs.pop("timeout", None)
if isinstance(timeout, Timeout):
timeout = Timeout.from_float(timeout.connect_timeout)
return orig_urlopen(*args, timeout=timeout, **kwargs)
conn.urlopen = urlopen
conn.urlopen.wrapped = True
return conn
开发者ID:3cky,项目名称:livestreamer,代码行数:18,代码来源:http_session.py
示例13: Foauth
class Foauth(BaseAdapter):
"""The foauth.org transport adapter."""
def __init__(self, username, password):
self.auth = (username, password)
self.http = HTTPAdapter()
def prepare_request(self, request):
p = urlparse(request.url)
# Rewrite the url to use foauth.org
request.url = FOAUTH_TEMPLATE.format(domain=p.netloc, path=p.path)
# Authenticate appropriately.
request.prepare_auth(self.auth)
return request
def send(self, request, **kwargs):
request = self.prepare_request(request)
return self.http.send(request, **kwargs)
开发者ID:banare,项目名称:requests-foauth,代码行数:19,代码来源:requests_foauth.py
示例14: __init__
def __init__(self, username, password):
self.auth = (username, password)
self.http = HTTPAdapter()
开发者ID:banare,项目名称:requests-foauth,代码行数:3,代码来源:requests_foauth.py
示例15: BetamaxAdapter
class BetamaxAdapter(BaseAdapter):
"""This object is an implementation detail of the library.
It is not meant to be a public API and is not exported as such.
"""
def __init__(self, **kwargs):
super(BetamaxAdapter, self).__init__()
self.cassette = None
self.cassette_name = None
self.old_adapters = kwargs.pop('old_adapters', {})
self.http_adapter = HTTPAdapter(**kwargs)
self.serialize = None
self.options = {}
def cassette_exists(self):
"""Check if cassette exists on file system.
:returns: bool -- True if exists, False otherwise
"""
if self.cassette_name and os.path.exists(self.cassette_name):
return True
return False
def close(self):
"""Propagate close to underlying adapter."""
self.http_adapter.close()
def eject_cassette(self):
"""Eject currently loaded cassette."""
if self.cassette:
self.cassette.eject()
self.cassette = None # Allow self.cassette to be garbage-collected
def load_cassette(self, cassette_name, serialize, options):
"""Load cassette.
Loads a previously serialized http response as a cassette
:param str cassette_name: (required), name of cassette
:param str serialize: (required), type of serialization i.e 'json'
:options dict options: (required), options for cassette
"""
self.cassette_name = cassette_name
self.serialize = serialize
self.options.update(options.items())
placeholders = self.options.get('placeholders', {})
cassette_options = {}
default_options = cassette.Cassette.default_cassette_options
match_requests_on = self.options.get(
'match_requests_on', default_options['match_requests_on']
)
cassette_options['preserve_exact_body_bytes'] = self.options.get(
'preserve_exact_body_bytes',
)
cassette_options['allow_playback_repeats'] = self.options.get(
'allow_playback_repeats'
)
cassette_options['record_mode'] = self.options.get('record')
for option, value in list(cassette_options.items()):
if value is None:
cassette_options.pop(option)
self.cassette = cassette.Cassette(
cassette_name, serialize, placeholders=placeholders,
cassette_library_dir=self.options.get('cassette_library_dir'),
**cassette_options
)
if 'record' in self.options:
self.cassette.record_mode = self.options['record']
# NOTE(sigmavirus24): Cassette.match_options is a set, might as well
# use that instead of overriding it.
self.cassette.match_options.update(match_requests_on)
re_record_interval = timedelta.max
if self.options.get('re_record_interval'):
re_record_interval = timedelta(self.options['re_record_interval'])
now = datetime.utcnow()
if re_record_interval < (now - self.cassette.earliest_recorded_date):
self.cassette.clear()
def send(self, request, stream=False, timeout=None, verify=True,
cert=None, proxies=None):
"""Send request.
:param request request: request
:returns: A Response object
"""
interaction = None
current_cassette = self.cassette
#.........这里部分代码省略.........
开发者ID:bboe,项目名称:betamax,代码行数:101,代码来源:adapter.py
示例16: get_connection
def get_connection(self, url, proxies=None):
url = url.replace(self.redirect_source, self.redirect_target)
return HTTPAdapter.get_connection(self, url, proxies=proxies)
开发者ID:OliverWS,项目名称:livestreamer,代码行数:3,代码来源:afreecatv.py
示例17: BetamaxAdapter
class BetamaxAdapter(BaseAdapter):
"""This object is an implementation detail of the library.
It is not meant to be a public API and is not exported as such.
"""
def __init__(self, **kwargs):
super(BetamaxAdapter, self).__init__()
self.cassette = None
self.cassette_name = None
self.old_adapters = kwargs.pop('old_adapters', {})
self.http_adapter = HTTPAdapter(**kwargs)
self.serialize = None
self.options = {}
def cassette_exists(self):
if self.cassette_name and os.path.exists(self.cassette_name):
return True
return False
def close(self):
self.http_adapter.close()
def eject_cassette(self):
if self.cassette:
self.cassette.eject()
self.cassette = None # Allow self.cassette to be garbage-collected
def load_cassette(self, cassette_name, serialize, options):
self.cassette_name = cassette_name
self.serialize = serialize
self.options.update(options.items())
placeholders = self.options.get('placeholders', [])
default_options = Cassette.default_cassette_options
match_requests_on = self.options.get(
'match_requests_on', default_options['match_requests_on']
)
preserve_exact_body_bytes = self.options.get(
'preserve_exact_body_bytes',
)
self.cassette = Cassette(
cassette_name, serialize, placeholders=placeholders,
record_mode=self.options.get('record'),
preserve_exact_body_bytes=preserve_exact_body_bytes
)
if 'record' in self.options:
self.cassette.record_mode = self.options['record']
self.cassette.match_options = match_requests_on
re_record_interval = timedelta.max
if self.options.get('re_record_interval'):
re_record_interval = timedelta(self.options['re_record_interval'])
now = datetime.utcnow()
if re_record_interval < (now - self.cassette.earliest_recorded_date):
self.cassette.clear()
def send(self, request, stream=False, timeout=None, verify=True,
cert=None, proxies=None):
interaction = None
if not self.cassette:
raise BetamaxError('No cassette was specified or found.')
if self.cassette.interactions:
interaction = self.cassette.find_match(request)
if not interaction and self.cassette.is_recording():
interaction = self.send_and_record(
request, stream, timeout, verify, cert, proxies
)
if not interaction:
raise BetamaxError(unhandled_request_message(request,
self.cassette))
resp = interaction.as_response()
resp.connection = self
return resp
def send_and_record(self, request, stream=False, timeout=None,
verify=True, cert=None, proxies=None):
adapter = self.find_adapter(request.url)
response = adapter.send(
request, stream=True, timeout=timeout, verify=verify,
cert=cert, proxies=proxies
)
self.cassette.save_interaction(response, request)
return self.cassette.interactions[-1]
def find_adapter(self, url):
for (prefix, adapter) in self.old_adapters.items():
#.........这里部分代码省略.........
开发者ID:dgouldin,项目名称:betamax,代码行数:101,代码来源:adapter.py
示例18: BetamaxAdapter
class BetamaxAdapter(BaseAdapter):
"""This object is an implementation detail of the library.
It is not meant to be a public API and is not exported as such.
"""
def __init__(self, **kwargs):
super(BetamaxAdapter, self).__init__()
self.cassette = None
self.cassette_name = None
self.http_adapter = HTTPAdapter(**kwargs)
self.serialize = None
self.options = {}
def cassette_exists(self):
if self.cassette_name and os.path.exists(self.cassette_name):
return True
return False
def close(self):
self.http_adapter.close()
def eject_cassette(self):
if self.cassette:
self.cassette.eject()
self.cassette = None # Allow self.cassette to be garbage-collected
def load_cassette(self, cassette_name, serialize, options):
self.cassette_name = cassette_name
self.serialize = serialize
self.options.update(options)
placeholders = self.options.get('placeholders')
# load cassette into memory
if self.cassette_exists():
self.cassette = Cassette(cassette_name, serialize,
placeholders=placeholders)
elif os.path.exists(os.path.dirname(cassette_name)):
self.cassette = Cassette(cassette_name, serialize, 'w+',
placeholders=placeholders)
else:
raise RuntimeError(
'No cassette could be loaded or %s does not exist.' %
os.path.dirname(cassette_name)
)
self.cassette.record_mode = self.options['record']
re_record_interval = timedelta.max
if self.options.get('re_record_interval'):
re_record_interval = timedelta(self.options['re_record_interval'])
now = datetime.utcnow()
if re_record_interval < (now - self.cassette.earliest_recorded_date):
self.cassette.clear()
def send(self, request, stream=False, timeout=None, verify=True,
cert=None, proxies=None):
interaction = None
match_on = Cassette.default_cassette_options['match_requests_on']
response = None
if not self.cassette:
raise BetamaxError('No cassette was specified or found.')
if self.cassette.interactions:
self.cassette.match_options = set(match_on)
interaction = self.cassette.find_match(request)
if not interaction and self.cassette.is_recording():
response = self.http_adapter.send(
request, stream=True, timeout=timeout, verify=verify,
cert=cert, proxies=proxies
)
self.cassette.save_interaction(response, request)
interaction = self.cassette.interactions[-1]
if not interaction:
raise BetamaxError('A request was made that could not be handled')
return interaction.as_response()
开发者ID:esacteksab,项目名称:betamax,代码行数:83,代码来源:adapter.py
示例19: HttpNtlmAuth
class HttpNtlmAuth(AuthBase):
"""HTTP NTLM Authentication Handler for Requests. Supports pass-the-hash."""
def __init__(self, username, password):
"""
:username - Username in 'domain\\username' format
:password - Password or hash in "ABCDABCDABCDABCD:ABCDABCDABCDABCD" format.
"""
if ntlm is None:
raise Exception("NTLM libraries unavailable")
#parse the username
try:
self.domain, self.username = username.split('\\', 1)
except ValueError:
raise ValueError("username should be in 'domain\\username' format.")
self.domain = self.domain.upper()
self.password = password
self.adapter = HTTPAdapter()
def retry_using_http_NTLM_auth(self, auth_header_field, auth_header,
response, args):
"""Attempts to authenticate using HTTP NTLM challenge/response"""
if auth_header in response.request.headers:
return response
request = copy_request(response.request)
# initial auth header with username. will result in challenge
auth = 'NTLM %s' % ntlm.create_NTLM_NEGOTIATE_MESSAGE("%s\\%s" % (self.domain,self.username))
request.headers[auth_header] = auth
# we must keep the connection because NTLM authenticates the connection, not single requests
request.headers["Connection"] = "Keep-Alive"
# A streaming response breaks authentication.
# This can be fixed by not streaming this request, which is safe because
# the returned response3 will still have stream=True set if specified in
# args. In addition, we expect this request to give us a challenge
# and not the real content, so the content will be short anyway.
args_nostream = dict(args, stream=False)
response2 = self.adapter.send(request, **args_nostream)
# this is important for some web applications that store authentication-related info in cookies (it took a long time to figure out)
if response2.headers.get('set-cookie'):
request.headers['Cookie'] = response2.headers.get('set-cookie')
# get the challenge
auth_header_value = response2.headers[auth_header_field]
ntlm_header_value = list(filter(lambda s: s.startswith('NTLM '), auth_header_value.split(',')))[0].strip()
ServerChallenge, NegotiateFlags = ntlm.parse_NTLM_CHALLENGE_MESSAGE(ntlm_header_value[5:])
# build response
request = copy_request(request)
auth = 'NTLM %s' % ntlm.create_NTLM_AUTHENTICATE_MESSAGE(ServerChallenge, self.username, self.domain, self.password, NegotiateFlags)
request.headers[auth_header] = auth
response3 = self.adapter.send(request, **args)
# Update the history.
response3.history.append(response)
response3.history.append(response2)
return response3
def response_hook(self, r, **kwargs):
if r.status_code == 401 and 'ntlm' in r.headers.get('www-authenticate','').lower():
return self.retry_using_http_NTLM_auth('www-authenticate',
'Authorization', r, kwargs)
if r.status_code == 407 and 'ntlm' in r.headers.get('proxy-authenticate','').lower():
return self.retry_using_http_NTLM_auth('proxy-authenticate',
'Proxy-authorization', r,
kwargs)
return r
def __call__(self, r):
r.register_hook('response', self.response_hook)
return r
开发者ID:njiang1987,项目名称:msichisel,代码行数:83,代码来源:requests_ntlm.py
示例20: download_result_file
def download_result_file(url, result_file_directory, result_file_name, decompress, overwrite):
""" Download file with specified URL and download parameters.
:param result_file_directory: The download result local directory name.
:type result_file_directory: str
:param result_file_name: The download result local file name.
:type result_file_name: str
:param decompress: Determines whether to decompress the ZIP file.
If set to true, the file will be decompressed after download.
The default value is false, in which case the downloaded file is not decompressed.
:type decompress: bool
:param overwrite: Indicates whether the result file should overwrite the existing file if any.
:type overwrite: bool
:return: The download file path.
:rtype: str
"""
if result_file_directory is None:
raise ValueError('result_file_directory cannot be None.')
if result_file_name is None:
result_file_name="default_file_name"
if decompress:
name, ext=os.path.splitext(result_file_name)
if ext == '.zip':
raise ValueError("Result file can't be decompressed into a file with extension 'zip'."
" Please change the extension of the result_file_name or pass decompress=false")
zip_file_path=os.path.join(result_file_directory, name + '.zip')
result_file_path=os.path.join(result_file_directory, result_file_name)
else:
result_file_path=os.path.join(result_file_directory, result_file_name)
zip_file_path=result_file_path
if os.path.exists(result_file_path) and overwrite is False:
if six.PY3:
raise FileExistsError('Result file: {0} exists'.format(result_file_path))
else:
raise OSError('Result file: {0} exists'.format(result_file_path))
pool_manager=PoolManager(
ssl_version=ssl.PROTOCOL_SSLv3,
)
http_adapter=HTTPAdapter()
http_adapter.poolmanager=pool_manager
s=requests.Session()
s.mount('https://', http_adapter)
r=s.get(url, stream=True, verify=True)
r.raise_for_status()
try:
with open(zip_file_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=4096):
if chunk:
f.write(chunk)
f.flush()
if decompress:
with contextlib.closing(zipfile.ZipFile(zip_file_path)) as compressed:
first=compressed.namelist()[0]
with open(result_file_path, 'wb') as f:
f.write(compressed.read(first))
except Exception as ex:
raise ex
finally:
if decompress and os.path.exists(zip_file_path):
os.remove(zip_file_path)
return result_file_path
开发者ID:ashishtyl,项目名称:BingAds-Python-SDK,代码行数:67,代码来源:ReportRequests.py
注:本文中的requests.adapters.HTTPAdapter类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论