• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python requests.Request类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests.Request的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_http_request

    def get_http_request(url, payload, method='POST', headers=None, use_proxy=False, use_proxy_auth=False, trust_env=True):
        try:
            session = Session()
            session.trust_env = trust_env
            session.proxies = Util.get_proxies() if use_proxy else None
            session.auth = Util.get_proxy_auth() if use_proxy_auth else None

            request = Request(
                'POST' if method not in ('GET', 'POST') else method,
                url,
                data=payload if method == 'POST' else None,
                params=payload if method == 'GET' else None,
                headers=headers
            )

            prepped = request.prepare()

            response = session.send(
                prepped,
                timeout=app.config['HTTP_REQUESTS_TIMEOUT']
            )

            session.close()
        except Exception, e:
            response = Response()
            response.raise_for_status()
            return response, 'Error al realizar la consulta - Motivo: {}'.format(e.message)
开发者ID:stabora,项目名称:nbsf,代码行数:27,代码来源:util.py


示例2: timbra_xml

 def timbra_xml(self, xml, id_original=0):
     user_token = self._get_token(id_original)
     if not user_token:
         return ''
     method = 'timbra_xml'
     data = {
         'xml': escape(xml.encode('ascii', 'xmlcharrefreplace').decode('utf-8')),
         'rfc': self.rfc,
         'user_token': user_token,
         'transaccion_id': id_original,
     }
     data = self._SOAP[method].format(**data).encode('utf-8')
     headers = {
         'SOAPAction': '"{}"'.format(self._ACTIONS[method]),
         'Content-length': len(data),
         'Content-type': 'text/xml; charset="UTF-8"'
     }
     s = Session()
     req = Request('POST', self.WS['timbrado'], data=data, headers=headers)
     prepped = req.prepare()
     try:
         response = s.send(prepped, timeout=TIMEOUT)
         res = xml2dict.parse(response.text)
         ok, res = self._check_fault(res)
         if ok:
             return ''
         return res['RespuestaTimbraXML']['ComprobanteXML']['DatosXML']
     except exceptions.Timeout:
         self.error = 'Tiempo de espera agotado'
     except Exception as e:
         self.error = str(e)
         log.debug(e)
     return ''
开发者ID:e-fector,项目名称:cfdi-test,代码行数:33,代码来源:pac.py


示例3: __init__

 def __init__(self, url=None, callback, method='GET', 
         headers=None, need_proxy=False, fail_times=0, timeout=TIMEOUT):
     Request.__init__(self, method, url, headers)
     self.callback = callback
     self.need_proxy = need_proxy
     self.fail_times = fail_times
     self.timeout = timeout
开发者ID:stevenshuang,项目名称:Python3-Spider-Actual-Combat,代码行数:7,代码来源:Request.py


示例4: _get_response

 def _get_response(self, content_type, url, headers, file_descriptor):
     s = Session()
     response = None
     req = Request(content_type, url, headers=headers, data=file_descriptor)
     prepared = req.prepare()
     try:
         response = s.send(prepared)
     except exceptions.Timeout:
         raise
     except exceptions.TooManyRedirects:
         raise
     except exceptions.RequestException:
         raise
     if response.status_code != requests.codes.ok:
         try:
             raise BackblazeException(response.status_code,
                                      response.json()['message'],
                                      response,
                                      headers)
         except ValueError:
             raise BackblazeException(response.status_code,
                                      response.text,
                                      response,
                                      headers)
     return response
开发者ID:th3architect,项目名称:oio-sds,代码行数:25,代码来源:backblaze_http.py


示例5: _http

  def _http(self, method, path, json=True, **kw):
    
    """
    A wrapper for http requests to streamtools.
    """
    
    # serialize all incoming json
    if 'data' in kw:
      kw['data'] = ujson.dumps(kw['data'])

    # construct the url endpoint
    url = 'http://{}/{}'.format(self.url, path)

    # special handling for streaming kwarg
    stream = kw.pop('stream', False)

    # format the request
    req = Request(method, url, **kw)

    # execute
    resp = self.s.send(req.prepare(), stream=stream)

    # return
    if json:
      
      return ujson.loads(resp.content)
    
    return resp
开发者ID:jeremyjbowers,项目名称:python-streamtools,代码行数:28,代码来源:client.py


示例6: get_request

 def get_request(url,header):
     req = Request(method="GET",url=url,headers=header)
     req_prepared = req.prepare()
     res = Response()
     s = Session()
     res = s.send(req_prepared)
     return res
开发者ID:shwetams,项目名称:python_azure_tables,代码行数:7,代码来源:base_rest_calls.py


示例7: call_api

    def call_api(self, operation_name, params=None):
        if not operation_name:
            return
        if params is None:
          params = {}

        api_session = Session()
        api_url = self._endpoint_url + operation_name
        req = Request('POST', api_url)
        prepped = req.prepare()
        self._encode_headers(prepped.headers)
        prepped.headers['Content-Type'] = 'application/json'
        prepped.headers['User-Agent'] = self._build_user_agent_header()
        self._signer.sign(prepped)

        # check if operation is for 'upload'
        if operation_name == 'upload':
            # get s3url for the upload and then do a upload
            resp = self._upload_file(params, prepped, api_session)
            return resp
        # prepare the body
        serializer = Serializer()
        serial_obj = serializer.serialize_to_request(params, None)
        prepped.prepare_body(serial_obj['body'], None)
        resp = api_session.send(prepped)
        return resp
开发者ID:cloudera,项目名称:hue,代码行数:26,代码来源:api_lib.py


示例8: build_environ

def build_environ(method, path, host='localhost',
                  accept_type='text/html', content_type=None,
                  query=None, form=None, files=None, cookies=None):
    if '://' in host:
        url = host.rstrip('/') + '/' + path.lstrip('/')
    else:
        url = 'http://' + host.strip('/') + '/' + path.lstrip('/')

    request = Request(method, url, None, files, form, query, cookies)
    prepared = request.prepare()
    parsed_url = parse_url(prepared.url)

    environ = {
        'HTTP_HOST': parsed_url.host,
        'PATH_INFO': parsed_url.path,
        'REQUEST_METHOD': prepared.method,

        'HTTP_ACCEPT': accept_type,

        'QUERY_STRING': parsed_url.query or '',
    }

    for key, value in iteritems(prepared.headers):
        key = underscore(key)
        if key not in ['content_type', 'content_length']:
            key = 'http_' + key
        environ[key.upper()] = value

    if content_type is not None:
        environ['CONTENT_TYPE'] = content_type

    environ['wsgi.input'] = BytesIO(prepared.body)

    return environ
开发者ID:CorverDevelopment,项目名称:Poort,代码行数:34,代码来源:utils.py


示例9: authorize_url

    def authorize_url(self, duration, scopes, state, implicit=False):
        """Return the URL used out-of-band to grant access to your application.

        :param duration: Either ``permanent`` or ``temporary``. ``temporary``
            authorizations generate access tokens that last only 1
            hour. ``permanent`` authorizations additionally generate a refresh
            token that can be indefinitely used to generate new hour-long
            access tokens. Only ``temporary`` can be specified if ``implicit``
            is set to ``True``.
        :param scopes: A list of OAuth scopes to request authorization for.
        :param state: A string that will be reflected in the callback to
            ``redirect_uri``. This value should be temporarily unique to the
            client for whom the URL was generated for.
        :param implicit: (optional) Use the implicit grant flow (default:
            False). This flow is only available for UntrustedAuthenticators.

        """
        if self.redirect_uri is None:
            raise InvalidInvocation('redirect URI not provided')
        if implicit and not isinstance(self, UntrustedAuthenticator):
            raise InvalidInvocation('Only UntrustedAuthentictor instances can '
                                    'use the implicit grant flow.')
        if implicit and duration != 'temporary':
            raise InvalidInvocation('The implicit grant flow only supports '
                                    'temporary access tokens.')

        params = {'client_id': self.client_id, 'duration': duration,
                  'redirect_uri': self.redirect_uri,
                  'response_type': 'token' if implicit else 'code',
                  'scope': ' '.join(scopes), 'state': state}
        url = self._requestor.reddit_url + const.AUTHORIZATION_PATH
        request = Request('GET', url, params=params)
        return request.prepare().url
开发者ID:nmtake,项目名称:prawcore,代码行数:33,代码来源:auth.py


示例10: prepare_and_send_request

def prepare_and_send_request(request_method, request_url, payload=None, authorization=None):

	headers = {
		'Accept' : 'application/json',
		'Accept-Encoding' : 'gzip,deflate,sdch',
		'Accept-Language' : 'en-US,en;q=0.8,pt-BR;q=0.6,pt;q=0.4',
		'Cache-Control' : 'no-cache',
		'Connection' : 'keep-alive',
	}

	if ( authorization != None ):
		headers['Authorization'] = "Basic " + base64.b64encode("%s:%s" % (authorization, TEST_USER_PASSWORD))

	if ( payload == None ):
		req = Request(request_method, request_url, headers=headers)
		r = req.prepare()
	else:
		headers['Content-Type'] = 'application/json'		
		req = Request(request_method, request_url, data=json.dumps(payload), headers=headers)
		r = req.prepare()

	s = Session()
	s.mount('https://', SSLAdapter('TLSv1'))

	try:
		resp = s.send(r, verify=False, timeout=1)
	except Timeout, SSLError:
		return prepare_and_send_request(request_method, request_url, payload, authorization)
开发者ID:MrFrety,项目名称:buddycloud-tests-framework,代码行数:28,代码来源:api_utils.py


示例11: __init__

 def __init__(self, url, callback, method='GET', headers=None, timeout=3, proxy=None, params=None, data=None):
     Request.__init__(url, method, headers)
     self.callback = callback
     self.timeout = timeout
     self.proxy = proxy
     self.params = params
     self.data = data
开发者ID:Rockyzsu,项目名称:base_function,代码行数:7,代码来源:Spider_Callback.py


示例12: _make_request

    def _make_request(self, method, url, data=None, params=None, **kwargs):
        if params:
            self.params.update(params)
        if kwargs.get('headers'):
            self.headers.update(kwargs['headers'])
        if data:
            data = self._stringify_dict_list(data)

        url = self.BASE_URL + self.VERSION + url
        req = Request(method, url, data=data, headers=self.headers, params=self.params)
        prepped = req.prepare()

        try:
            response = self._session.send(prepped, timeout=self.timeout)
            if response.status_code > 299:
                if 'message' in str(response.content):
                    response.reason += ': {}'.format(response.json()['message'])
                response.raise_for_status()
        except requests.HTTPError:
            msg = '{} {}'.format(response.status_code, response.reason)
            raise HttpError(msg)
        except requests.Timeout:
            raise TimeoutError('{} {} timed out after {} seconds'.format(
                method, url, self.timeout[0] + self.timeout[1]
            ))
        except requests.ConnectionError as e:
            raise ClientError('Could not reach: {} {} {}'.format(method, url, e))

        return response.json()
开发者ID:itorres,项目名称:sd-python-wrapper,代码行数:29,代码来源:apiclient.py


示例13: get_estatus

 def get_estatus(self, data):
     data['emisor_rfc'] = escape(data['emisor_rfc'])
     data['receptor_rfc'] = escape(data['receptor_rfc'])
     data = self._soap.format(**data).encode('utf-8')
     headers = {
         'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
         'Content-length': len(data),
         'Content-type': 'text/xml; charset="UTF-8"'
     }
     s = Session()
     s.verify = False
     req = Request('POST', self._webservice, data=data, headers=headers)
     prepped = req.prepare()
     try:
         response = s.send(prepped, timeout=5)
         res = xml2dict.parse(response.text)
         ok, res = self._check_fault(res)
         if ok:
             return False
         self.msg = res['ConsultaResponse']['ConsultaResult']['a:Estado']
         return True
     except exceptions.Timeout:
         self.error = 'Tiempo de espera agotado'
     except Exception as e:
         print (e)
     return False
开发者ID:e-fector,项目名称:cfdi-test,代码行数:26,代码来源:pac.py


示例14: recuperar_acuse

 def recuperar_acuse(self, uuid):
     transaccion_id = get_epoch()
     user_token = self._get_token(transaccion_id)
     if not user_token:
         return ''
     method = 'recuperar_acuse'
     data = {
         'uuid': uuid,
         'rfc': self.rfc,
         'user_token': user_token,
         'transaccion_id': transaccion_id,
     }
     data = self._SOAP[method].format(**data).encode('utf-8')
     headers = {
         'SOAPAction': '"{}"'.format(self._ACTIONS[method]),
         'Content-length': len(data),
         'Content-type': 'text/xml; charset="UTF-8"'
     }
     s = Session()
     req = Request('POST', self.WS['cancelacion'], data=data, headers=headers)
     prepped = req.prepare()
     try:
         response = s.send(prepped, timeout=TIMEOUT)
         res = xml2dict.parse(response.text)
         ok, res = self._check_fault(res)
         if ok:
             return ''
         return res['RespuestaRecuperarAcuse']['AcuseXML'].replace("'", '"')
     except exceptions.Timeout:
         self.error = 'Tiempo de espera agotado'
         print (self.error)
     except Exception as e:
         print ('Recuperar Acuse', e)
     return
开发者ID:e-fector,项目名称:cfdi-test,代码行数:34,代码来源:pac.py


示例15: _ads_request

 def _ads_request(method, url, headers=None, data=None, url_params=None):
     logging.warn('Ads Request:%s' % url)
     s = Session()
     req = Request(string.upper(method), url, headers=headers, data=data, params=url_params)
     prepped = req.prepare()
     rsp = s.send(prepped)
     return rsp.text
开发者ID:GeekTemo,项目名称:ads_simulation,代码行数:7,代码来源:simulation.py


示例16: ban_url_list

    def ban_url_list(self, url_list):
        """
        Bans a list of urls.
        """
        if isinstance(url_list, list) and url_list:
            if self.hostnames and self.varnish_nodes:
                url_combo = '(' + '|'.join(url_list) + ')'

                for hostname in self.hostnames:
                    header = {'X-Ban-Url': url_combo, 'X-Ban-Host': hostname}

                    s = Session()
                    for node in self.varnish_nodes:
                        try:
                            req = Request('BAN', node,
                                headers=header
                            )
                            prepped = req.prepare()

                            resp = s.send(prepped,
                                          timeout=2)
                        except Exception:
                            log.error('Error sending ban to ' + node)
                        else:
                            if codes.ok != resp.status_code:
                                log.error('Error sending ban to ' + node)
            else:
                log.warning('No varnish nodes provided to clear the cache')
        else:
            log.warning('No URLs provided')
开发者ID:zeus911,项目名称:CdnPurgeModule,代码行数:30,代码来源:cdnpurge.py


示例17: build_request

    def build_request(self, verb, data, verb_attrs, files=None):
 
        self.verb = verb
        self._request_dict = data
        self._request_id = uuid.uuid4()

        url = self.build_request_url(verb)

        headers = self.build_request_headers(verb)
        headers.update({'User-Agent': UserAgent, 
                        'X-EBAY-SDK-REQUEST-ID': str(self._request_id)})
                        
        # if we are adding files, we ensure there is no Content-Type header already defined
        # otherwise Request will use the existing one which is likely not to be multipart/form-data
        # data must also be a dict so we make it so if needed

        requestData = self.build_request_data(verb, data, verb_attrs)
        if files:
            del(headers['Content-Type'])
            if isinstance(requestData, basestring):
                requestData = {'XMLPayload':requestData}

        request = Request(self.method,
            url,
            data=requestData,
            headers=headers,
            files=files,
        )


        self.request = request.prepare()
开发者ID:ubic135,项目名称:ebaysdk-python,代码行数:31,代码来源:connection.py


示例18: get_acacess_token_app

def get_acacess_token_app(tenant_id,client_id,client_secret,resource):
    initialise()
    
    url = "https://login.windows.net/" + tenant_id + "/oauth2/token"        
    
    body_data = "&grant_type=client_credentials&resource="+ resource +"&client_id="+ client_id + "&client_secret="+  urllib.quote_plus(client_secret)
    
    headers = {"Content-Type":"application/x-www-form-urlencoded"}
    req = Request(method="POST",url=url,data=body_data)
    req_prepped = req.prepare()
    s = Session()
    res = Response()
    res = s.send(req_prepped)
    access_token_det = {}
    if (res.status_code == 200):
        responseJSON = json.loads(res.content)
        access_token_det["details"]= responseJSON["access_token"]
        access_token_det["status"]="1"
        access_token_det["exp_time"]=responseJSON["expires_in"]
        access_token_det["exp_date"]=responseJSON["expires_on"]
        access_token_det["accessDetails"]=responseJSON
    else:
        access_token_det["details"]= str(res.status_code) + str(res.json())
        access_token_det["status"]="0"
    return access_token_det
开发者ID:shwetams,项目名称:arm-samples-py,代码行数:25,代码来源:views.py


示例19: _upload_file

    def _upload_file(self, params, prepped, api_session):
        # header for upload
        s3_session = Session()
        api_url = self._endpoint_url + "getS3url"
        req = Request('POST', api_url)
        upload_prepped = req.prepare()
        self._encode_headers(upload_prepped.headers)
        upload_prepped.headers['Content-Type'] = 'application/json'
        upload_prepped.headers['User-Agent'] = self._build_user_agent_header()
        self._signer.sign(upload_prepped)
        # prepare params for s3 url
        url_parameters = {'fileName': '', 'tenant': ''}
        if 'fileName' in params and params['fileName']:
            url_parameters['fileName'] = params['file_name']
        elif 'fileLocation' in params and params['fileLocation']:
            if os.path.isfile(params['fileLocation']):
                fileName = os.path.basename(params['fileLocation'])
                url_parameters['fileName'] = fileName

        if 'tenant' in params and params['tenant']:
            url_parameters['tenant'] = params['tenant']
        # prepare the body
        serializer = Serializer()
        serial_obj = serializer.serialize_to_request(url_parameters, None)
        upload_prepped.prepare_body(serial_obj['body'], None)
        resp = s3_session.send(upload_prepped)
        resp = json.loads(json.dumps(resp.json()))

        # upload file to S3 bucket
        if 'url' in resp and 'fileLocation' in params and params['fileLocation']:
            put(resp['url'], data=open(params['fileLocation']).read())
            # build upload parameters
            upload_params = {'rowDelim': '', 'colDelim': '', 'headerFields': [],
                             'tenant': '', 'fileType': 0}
            # now do actual upload
            if 'tenant' in params and params['tenant']:
                upload_params['tenant'] = params['tenant']
            upload_params['fileLocation'] = params['fileLocation']
            if os.path.isfile(params['fileLocation']):
                fileName = os.path.basename(params['fileLocation'])
                upload_params['fileName'] = fileName
            if 'fileName' in params and params['fileName']:
                upload_params['fileName'] = params['fileName']
            if 'sourcePlatform' in params and params['sourcePlatform']:
                upload_params['sourcePlatform'] = params['sourcePlatform']
            if 'colDelim' in params and params['colDelim']:
                upload_params['colDelim'] = params['colDelim']
            if 'rowDelim' in params and params['rowDelim']:
                upload_params['rowDelim'] = params['rowDelim']
            if 'headerFields' in params and params['headerFields']:
                upload_params['headerFields'] = params['headerFields']
            if 'fileType' in params and params['fileType']:
                upload_params['fileType'] = params['fileType']
            # prepare the body
            serializer = Serializer()
            serial_obj = serializer.serialize_to_request(upload_params, None)
            prepped.prepare_body(serial_obj['body'], None)
            resp = api_session.send(prepped)
            resp = json.dumps(resp.json())
        return resp
开发者ID:cloudera,项目名称:hue,代码行数:60,代码来源:api_lib.py


示例20: get_subscription_permissions

def get_subscription_permissions(token_det, subscription_id):
    initialise()
    url = "https://management.azure.com/subscriptions/"+subscription_id+"/providers/microsoft.authorization/permissions?api-version=2014-07-01-preview"
    headers = {"Authorization":"Bearer " + token_det}
    req = Request(method="GET",url=url,headers=headers)
    req_prepped = req.prepare()
    s = Session()
    res = Response()
    hasPermission = False
    #hasPermission = {}
    res = s.send(req_prepped)
    per_sp = "microsoft.authorization/roleassignments/write"
    per_gen = "microsoft.authorization/*/write"
    hasPermission = res.content
    if (res.status_code == 200):
        resJSON = json.loads(res.content)
        actions_result = resJSON["value"]
        for actions_r in actions_result:
            actions = actions_r["actions"]
            notactions = actions_r["notActions"]
            if per_sp in actions:
                if per_gen not in notactions:
                    hasPermission = True
            else:
                if per_gen in actions:
                    if per_gen not in notactions:
                        hasPermission = True
    return hasPermission
开发者ID:shwetams,项目名称:arm-samples-py,代码行数:28,代码来源:views.py



注:本文中的requests.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python requests.Response类代码示例发布时间:2022-05-26
下一篇:
Python requests.session函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap