本文整理汇总了Python中requests.Request类的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_http_request
def get_http_request(url, payload, method='POST', headers=None, use_proxy=False, use_proxy_auth=False, trust_env=True):
try:
session = Session()
session.trust_env = trust_env
session.proxies = Util.get_proxies() if use_proxy else None
session.auth = Util.get_proxy_auth() if use_proxy_auth else None
request = Request(
'POST' if method not in ('GET', 'POST') else method,
url,
data=payload if method == 'POST' else None,
params=payload if method == 'GET' else None,
headers=headers
)
prepped = request.prepare()
response = session.send(
prepped,
timeout=app.config['HTTP_REQUESTS_TIMEOUT']
)
session.close()
except Exception, e:
response = Response()
response.raise_for_status()
return response, 'Error al realizar la consulta - Motivo: {}'.format(e.message)
开发者ID:stabora,项目名称:nbsf,代码行数:27,代码来源:util.py
示例2: timbra_xml
def timbra_xml(self, xml, id_original=0):
user_token = self._get_token(id_original)
if not user_token:
return ''
method = 'timbra_xml'
data = {
'xml': escape(xml.encode('ascii', 'xmlcharrefreplace').decode('utf-8')),
'rfc': self.rfc,
'user_token': user_token,
'transaccion_id': id_original,
}
data = self._SOAP[method].format(**data).encode('utf-8')
headers = {
'SOAPAction': '"{}"'.format(self._ACTIONS[method]),
'Content-length': len(data),
'Content-type': 'text/xml; charset="UTF-8"'
}
s = Session()
req = Request('POST', self.WS['timbrado'], data=data, headers=headers)
prepped = req.prepare()
try:
response = s.send(prepped, timeout=TIMEOUT)
res = xml2dict.parse(response.text)
ok, res = self._check_fault(res)
if ok:
return ''
return res['RespuestaTimbraXML']['ComprobanteXML']['DatosXML']
except exceptions.Timeout:
self.error = 'Tiempo de espera agotado'
except Exception as e:
self.error = str(e)
log.debug(e)
return ''
开发者ID:e-fector,项目名称:cfdi-test,代码行数:33,代码来源:pac.py
示例3: __init__
def __init__(self, url=None, callback, method='GET',
headers=None, need_proxy=False, fail_times=0, timeout=TIMEOUT):
Request.__init__(self, method, url, headers)
self.callback = callback
self.need_proxy = need_proxy
self.fail_times = fail_times
self.timeout = timeout
开发者ID:stevenshuang,项目名称:Python3-Spider-Actual-Combat,代码行数:7,代码来源:Request.py
示例4: _get_response
def _get_response(self, content_type, url, headers, file_descriptor):
s = Session()
response = None
req = Request(content_type, url, headers=headers, data=file_descriptor)
prepared = req.prepare()
try:
response = s.send(prepared)
except exceptions.Timeout:
raise
except exceptions.TooManyRedirects:
raise
except exceptions.RequestException:
raise
if response.status_code != requests.codes.ok:
try:
raise BackblazeException(response.status_code,
response.json()['message'],
response,
headers)
except ValueError:
raise BackblazeException(response.status_code,
response.text,
response,
headers)
return response
开发者ID:th3architect,项目名称:oio-sds,代码行数:25,代码来源:backblaze_http.py
示例5: _http
def _http(self, method, path, json=True, **kw):
"""
A wrapper for http requests to streamtools.
"""
# serialize all incoming json
if 'data' in kw:
kw['data'] = ujson.dumps(kw['data'])
# construct the url endpoint
url = 'http://{}/{}'.format(self.url, path)
# special handling for streaming kwarg
stream = kw.pop('stream', False)
# format the request
req = Request(method, url, **kw)
# execute
resp = self.s.send(req.prepare(), stream=stream)
# return
if json:
return ujson.loads(resp.content)
return resp
开发者ID:jeremyjbowers,项目名称:python-streamtools,代码行数:28,代码来源:client.py
示例6: get_request
def get_request(url,header):
req = Request(method="GET",url=url,headers=header)
req_prepared = req.prepare()
res = Response()
s = Session()
res = s.send(req_prepared)
return res
开发者ID:shwetams,项目名称:python_azure_tables,代码行数:7,代码来源:base_rest_calls.py
示例7: call_api
def call_api(self, operation_name, params=None):
if not operation_name:
return
if params is None:
params = {}
api_session = Session()
api_url = self._endpoint_url + operation_name
req = Request('POST', api_url)
prepped = req.prepare()
self._encode_headers(prepped.headers)
prepped.headers['Content-Type'] = 'application/json'
prepped.headers['User-Agent'] = self._build_user_agent_header()
self._signer.sign(prepped)
# check if operation is for 'upload'
if operation_name == 'upload':
# get s3url for the upload and then do a upload
resp = self._upload_file(params, prepped, api_session)
return resp
# prepare the body
serializer = Serializer()
serial_obj = serializer.serialize_to_request(params, None)
prepped.prepare_body(serial_obj['body'], None)
resp = api_session.send(prepped)
return resp
开发者ID:cloudera,项目名称:hue,代码行数:26,代码来源:api_lib.py
示例8: build_environ
def build_environ(method, path, host='localhost',
accept_type='text/html', content_type=None,
query=None, form=None, files=None, cookies=None):
if '://' in host:
url = host.rstrip('/') + '/' + path.lstrip('/')
else:
url = 'http://' + host.strip('/') + '/' + path.lstrip('/')
request = Request(method, url, None, files, form, query, cookies)
prepared = request.prepare()
parsed_url = parse_url(prepared.url)
environ = {
'HTTP_HOST': parsed_url.host,
'PATH_INFO': parsed_url.path,
'REQUEST_METHOD': prepared.method,
'HTTP_ACCEPT': accept_type,
'QUERY_STRING': parsed_url.query or '',
}
for key, value in iteritems(prepared.headers):
key = underscore(key)
if key not in ['content_type', 'content_length']:
key = 'http_' + key
environ[key.upper()] = value
if content_type is not None:
environ['CONTENT_TYPE'] = content_type
environ['wsgi.input'] = BytesIO(prepared.body)
return environ
开发者ID:CorverDevelopment,项目名称:Poort,代码行数:34,代码来源:utils.py
示例9: authorize_url
def authorize_url(self, duration, scopes, state, implicit=False):
"""Return the URL used out-of-band to grant access to your application.
:param duration: Either ``permanent`` or ``temporary``. ``temporary``
authorizations generate access tokens that last only 1
hour. ``permanent`` authorizations additionally generate a refresh
token that can be indefinitely used to generate new hour-long
access tokens. Only ``temporary`` can be specified if ``implicit``
is set to ``True``.
:param scopes: A list of OAuth scopes to request authorization for.
:param state: A string that will be reflected in the callback to
``redirect_uri``. This value should be temporarily unique to the
client for whom the URL was generated for.
:param implicit: (optional) Use the implicit grant flow (default:
False). This flow is only available for UntrustedAuthenticators.
"""
if self.redirect_uri is None:
raise InvalidInvocation('redirect URI not provided')
if implicit and not isinstance(self, UntrustedAuthenticator):
raise InvalidInvocation('Only UntrustedAuthentictor instances can '
'use the implicit grant flow.')
if implicit and duration != 'temporary':
raise InvalidInvocation('The implicit grant flow only supports '
'temporary access tokens.')
params = {'client_id': self.client_id, 'duration': duration,
'redirect_uri': self.redirect_uri,
'response_type': 'token' if implicit else 'code',
'scope': ' '.join(scopes), 'state': state}
url = self._requestor.reddit_url + const.AUTHORIZATION_PATH
request = Request('GET', url, params=params)
return request.prepare().url
开发者ID:nmtake,项目名称:prawcore,代码行数:33,代码来源:auth.py
示例10: prepare_and_send_request
def prepare_and_send_request(request_method, request_url, payload=None, authorization=None):
headers = {
'Accept' : 'application/json',
'Accept-Encoding' : 'gzip,deflate,sdch',
'Accept-Language' : 'en-US,en;q=0.8,pt-BR;q=0.6,pt;q=0.4',
'Cache-Control' : 'no-cache',
'Connection' : 'keep-alive',
}
if ( authorization != None ):
headers['Authorization'] = "Basic " + base64.b64encode("%s:%s" % (authorization, TEST_USER_PASSWORD))
if ( payload == None ):
req = Request(request_method, request_url, headers=headers)
r = req.prepare()
else:
headers['Content-Type'] = 'application/json'
req = Request(request_method, request_url, data=json.dumps(payload), headers=headers)
r = req.prepare()
s = Session()
s.mount('https://', SSLAdapter('TLSv1'))
try:
resp = s.send(r, verify=False, timeout=1)
except Timeout, SSLError:
return prepare_and_send_request(request_method, request_url, payload, authorization)
开发者ID:MrFrety,项目名称:buddycloud-tests-framework,代码行数:28,代码来源:api_utils.py
示例11: __init__
def __init__(self, url, callback, method='GET', headers=None, timeout=3, proxy=None, params=None, data=None):
Request.__init__(url, method, headers)
self.callback = callback
self.timeout = timeout
self.proxy = proxy
self.params = params
self.data = data
开发者ID:Rockyzsu,项目名称:base_function,代码行数:7,代码来源:Spider_Callback.py
示例12: _make_request
def _make_request(self, method, url, data=None, params=None, **kwargs):
if params:
self.params.update(params)
if kwargs.get('headers'):
self.headers.update(kwargs['headers'])
if data:
data = self._stringify_dict_list(data)
url = self.BASE_URL + self.VERSION + url
req = Request(method, url, data=data, headers=self.headers, params=self.params)
prepped = req.prepare()
try:
response = self._session.send(prepped, timeout=self.timeout)
if response.status_code > 299:
if 'message' in str(response.content):
response.reason += ': {}'.format(response.json()['message'])
response.raise_for_status()
except requests.HTTPError:
msg = '{} {}'.format(response.status_code, response.reason)
raise HttpError(msg)
except requests.Timeout:
raise TimeoutError('{} {} timed out after {} seconds'.format(
method, url, self.timeout[0] + self.timeout[1]
))
except requests.ConnectionError as e:
raise ClientError('Could not reach: {} {} {}'.format(method, url, e))
return response.json()
开发者ID:itorres,项目名称:sd-python-wrapper,代码行数:29,代码来源:apiclient.py
示例13: get_estatus
def get_estatus(self, data):
data['emisor_rfc'] = escape(data['emisor_rfc'])
data['receptor_rfc'] = escape(data['receptor_rfc'])
data = self._soap.format(**data).encode('utf-8')
headers = {
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
'Content-length': len(data),
'Content-type': 'text/xml; charset="UTF-8"'
}
s = Session()
s.verify = False
req = Request('POST', self._webservice, data=data, headers=headers)
prepped = req.prepare()
try:
response = s.send(prepped, timeout=5)
res = xml2dict.parse(response.text)
ok, res = self._check_fault(res)
if ok:
return False
self.msg = res['ConsultaResponse']['ConsultaResult']['a:Estado']
return True
except exceptions.Timeout:
self.error = 'Tiempo de espera agotado'
except Exception as e:
print (e)
return False
开发者ID:e-fector,项目名称:cfdi-test,代码行数:26,代码来源:pac.py
示例14: recuperar_acuse
def recuperar_acuse(self, uuid):
transaccion_id = get_epoch()
user_token = self._get_token(transaccion_id)
if not user_token:
return ''
method = 'recuperar_acuse'
data = {
'uuid': uuid,
'rfc': self.rfc,
'user_token': user_token,
'transaccion_id': transaccion_id,
}
data = self._SOAP[method].format(**data).encode('utf-8')
headers = {
'SOAPAction': '"{}"'.format(self._ACTIONS[method]),
'Content-length': len(data),
'Content-type': 'text/xml; charset="UTF-8"'
}
s = Session()
req = Request('POST', self.WS['cancelacion'], data=data, headers=headers)
prepped = req.prepare()
try:
response = s.send(prepped, timeout=TIMEOUT)
res = xml2dict.parse(response.text)
ok, res = self._check_fault(res)
if ok:
return ''
return res['RespuestaRecuperarAcuse']['AcuseXML'].replace("'", '"')
except exceptions.Timeout:
self.error = 'Tiempo de espera agotado'
print (self.error)
except Exception as e:
print ('Recuperar Acuse', e)
return
开发者ID:e-fector,项目名称:cfdi-test,代码行数:34,代码来源:pac.py
示例15: _ads_request
def _ads_request(method, url, headers=None, data=None, url_params=None):
logging.warn('Ads Request:%s' % url)
s = Session()
req = Request(string.upper(method), url, headers=headers, data=data, params=url_params)
prepped = req.prepare()
rsp = s.send(prepped)
return rsp.text
开发者ID:GeekTemo,项目名称:ads_simulation,代码行数:7,代码来源:simulation.py
示例16: ban_url_list
def ban_url_list(self, url_list):
"""
Bans a list of urls.
"""
if isinstance(url_list, list) and url_list:
if self.hostnames and self.varnish_nodes:
url_combo = '(' + '|'.join(url_list) + ')'
for hostname in self.hostnames:
header = {'X-Ban-Url': url_combo, 'X-Ban-Host': hostname}
s = Session()
for node in self.varnish_nodes:
try:
req = Request('BAN', node,
headers=header
)
prepped = req.prepare()
resp = s.send(prepped,
timeout=2)
except Exception:
log.error('Error sending ban to ' + node)
else:
if codes.ok != resp.status_code:
log.error('Error sending ban to ' + node)
else:
log.warning('No varnish nodes provided to clear the cache')
else:
log.warning('No URLs provided')
开发者ID:zeus911,项目名称:CdnPurgeModule,代码行数:30,代码来源:cdnpurge.py
示例17: build_request
def build_request(self, verb, data, verb_attrs, files=None):
self.verb = verb
self._request_dict = data
self._request_id = uuid.uuid4()
url = self.build_request_url(verb)
headers = self.build_request_headers(verb)
headers.update({'User-Agent': UserAgent,
'X-EBAY-SDK-REQUEST-ID': str(self._request_id)})
# if we are adding files, we ensure there is no Content-Type header already defined
# otherwise Request will use the existing one which is likely not to be multipart/form-data
# data must also be a dict so we make it so if needed
requestData = self.build_request_data(verb, data, verb_attrs)
if files:
del(headers['Content-Type'])
if isinstance(requestData, basestring):
requestData = {'XMLPayload':requestData}
request = Request(self.method,
url,
data=requestData,
headers=headers,
files=files,
)
self.request = request.prepare()
开发者ID:ubic135,项目名称:ebaysdk-python,代码行数:31,代码来源:connection.py
示例18: get_acacess_token_app
def get_acacess_token_app(tenant_id,client_id,client_secret,resource):
initialise()
url = "https://login.windows.net/" + tenant_id + "/oauth2/token"
body_data = "&grant_type=client_credentials&resource="+ resource +"&client_id="+ client_id + "&client_secret="+ urllib.quote_plus(client_secret)
headers = {"Content-Type":"application/x-www-form-urlencoded"}
req = Request(method="POST",url=url,data=body_data)
req_prepped = req.prepare()
s = Session()
res = Response()
res = s.send(req_prepped)
access_token_det = {}
if (res.status_code == 200):
responseJSON = json.loads(res.content)
access_token_det["details"]= responseJSON["access_token"]
access_token_det["status"]="1"
access_token_det["exp_time"]=responseJSON["expires_in"]
access_token_det["exp_date"]=responseJSON["expires_on"]
access_token_det["accessDetails"]=responseJSON
else:
access_token_det["details"]= str(res.status_code) + str(res.json())
access_token_det["status"]="0"
return access_token_det
开发者ID:shwetams,项目名称:arm-samples-py,代码行数:25,代码来源:views.py
示例19: _upload_file
def _upload_file(self, params, prepped, api_session):
# header for upload
s3_session = Session()
api_url = self._endpoint_url + "getS3url"
req = Request('POST', api_url)
upload_prepped = req.prepare()
self._encode_headers(upload_prepped.headers)
upload_prepped.headers['Content-Type'] = 'application/json'
upload_prepped.headers['User-Agent'] = self._build_user_agent_header()
self._signer.sign(upload_prepped)
# prepare params for s3 url
url_parameters = {'fileName': '', 'tenant': ''}
if 'fileName' in params and params['fileName']:
url_parameters['fileName'] = params['file_name']
elif 'fileLocation' in params and params['fileLocation']:
if os.path.isfile(params['fileLocation']):
fileName = os.path.basename(params['fileLocation'])
url_parameters['fileName'] = fileName
if 'tenant' in params and params['tenant']:
url_parameters['tenant'] = params['tenant']
# prepare the body
serializer = Serializer()
serial_obj = serializer.serialize_to_request(url_parameters, None)
upload_prepped.prepare_body(serial_obj['body'], None)
resp = s3_session.send(upload_prepped)
resp = json.loads(json.dumps(resp.json()))
# upload file to S3 bucket
if 'url' in resp and 'fileLocation' in params and params['fileLocation']:
put(resp['url'], data=open(params['fileLocation']).read())
# build upload parameters
upload_params = {'rowDelim': '', 'colDelim': '', 'headerFields': [],
'tenant': '', 'fileType': 0}
# now do actual upload
if 'tenant' in params and params['tenant']:
upload_params['tenant'] = params['tenant']
upload_params['fileLocation'] = params['fileLocation']
if os.path.isfile(params['fileLocation']):
fileName = os.path.basename(params['fileLocation'])
upload_params['fileName'] = fileName
if 'fileName' in params and params['fileName']:
upload_params['fileName'] = params['fileName']
if 'sourcePlatform' in params and params['sourcePlatform']:
upload_params['sourcePlatform'] = params['sourcePlatform']
if 'colDelim' in params and params['colDelim']:
upload_params['colDelim'] = params['colDelim']
if 'rowDelim' in params and params['rowDelim']:
upload_params['rowDelim'] = params['rowDelim']
if 'headerFields' in params and params['headerFields']:
upload_params['headerFields'] = params['headerFields']
if 'fileType' in params and params['fileType']:
upload_params['fileType'] = params['fileType']
# prepare the body
serializer = Serializer()
serial_obj = serializer.serialize_to_request(upload_params, None)
prepped.prepare_body(serial_obj['body'], None)
resp = api_session.send(prepped)
resp = json.dumps(resp.json())
return resp
开发者ID:cloudera,项目名称:hue,代码行数:60,代码来源:api_lib.py
示例20: get_subscription_permissions
def get_subscription_permissions(token_det, subscription_id):
initialise()
url = "https://management.azure.com/subscriptions/"+subscription_id+"/providers/microsoft.authorization/permissions?api-version=2014-07-01-preview"
headers = {"Authorization":"Bearer " + token_det}
req = Request(method="GET",url=url,headers=headers)
req_prepped = req.prepare()
s = Session()
res = Response()
hasPermission = False
#hasPermission = {}
res = s.send(req_prepped)
per_sp = "microsoft.authorization/roleassignments/write"
per_gen = "microsoft.authorization/*/write"
hasPermission = res.content
if (res.status_code == 200):
resJSON = json.loads(res.content)
actions_result = resJSON["value"]
for actions_r in actions_result:
actions = actions_r["actions"]
notactions = actions_r["notActions"]
if per_sp in actions:
if per_gen not in notactions:
hasPermission = True
else:
if per_gen in actions:
if per_gen not in notactions:
hasPermission = True
return hasPermission
开发者ID:shwetams,项目名称:arm-samples-py,代码行数:28,代码来源:views.py
注:本文中的requests.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论