• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python compat.urlparse函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests.compat.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_connection

    def get_connection(self, url, proxies=None, verify=None, cert=None):
        """Returns a urllib3 connection for the given URL. This should not be
        called from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param url: The URL to connect to.
        :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
        """
        with self._pool_kw_lock:
            if url.lower().startswith('https'):
                self._update_poolmanager_ssl_kw(verify, cert)

            proxies = proxies or {}
            proxy = proxies.get(urlparse(url.lower()).scheme)

            if proxy:
                proxy = prepend_scheme_if_needed(proxy, 'http')
                proxy_manager = self.proxy_manager_for(proxy)
                conn = proxy_manager.connection_from_url(url)
            else:
                # Only scheme should be lower case
                parsed = urlparse(url)
                url = parsed.geturl()
                conn = self.poolmanager.connection_from_url(url)

        return conn
开发者ID:pcreech,项目名称:pulp,代码行数:26,代码来源:adapters.py


示例2: authenticate_server

    def authenticate_server(self, response):
        """
        Uses GSSAPI to authenticate the server.

        Returns True on success, False on failure.
        """

        log.debug("authenticate_server(): Authenticate header: {0}".format(
            _negotiate_value(response)))

        host_port_thread = "%s_%s_%s" % (urlparse(response.url).hostname,
                                         urlparse(response.url).port,
                                         threading.current_thread().ident)

        try:
            result = kerberos.authGSSClientStep(self.context[host_port_thread],
                                                _negotiate_value(response))
        except kerberos.GSSError:
            log.exception("authenticate_server(): authGSSClientStep() failed:")
            return False

        if result < 1:
            log.error("authenticate_server(): authGSSClientStep() failed: "
                      "{0}".format(result))
            return False

        log.debug("authenticate_server(): returning {0}".format(response))
        return True
开发者ID:cloudera,项目名称:hue,代码行数:28,代码来源:kerberos_.py


示例3: headers

    def headers(self):
        """Return Request-Line"""
        url = urlparse(self._orig.url)

        # Querystring
        qs = ''
        if url.query or self._orig.params:
            qs = '?'
            if url.query:
                qs += url.query
            # Requests doesn't make params part of ``request.url``.
            if self._orig.params:
                if url.query:
                    qs += '&'
                #noinspection PyUnresolvedReferences
                qs += type(self._orig)._encode_params(self._orig.params)

        # Request-Line
        request_line = '{method} {path}{query} HTTP/1.1'.format(
            method=self._orig.method,
            path=url.path or '/',
            query=qs
        )

        headers = dict(self._orig.headers)

        if 'Host' not in headers:
            headers['Host'] = urlparse(self._orig.url).netloc

        headers = ['%s: %s' % (name, value)
                   for name, value in headers.items()]

        headers.insert(0, request_line)

        return '\r\n'.join(headers).strip()
开发者ID:bkvirendra,项目名称:httpie,代码行数:35,代码来源:models.py


示例4: refresh

    def refresh(self):
        name = urlparse(self.observable).netloc.replace(
            '.', '_').replace(':', '_')

        # Add the auth headers to any other headers
        auth_headers = self.auth.get_headers()
        headers = self.http_args.get('headers', {})
        headers.update(auth_headers)

        # build new http args with these headers
        http_args = self.http_args.copy()
        http_args['headers'] = headers
        response = requests.get(self.info_url, **http_args)
        if response.status_code != 200:
            raise Exception('%s: status code %d' % (response.url,
                                                    response.status_code))
        info = msgpack.unpackb(response.content, encoding='utf-8')
        self.metadata = info['metadata']

        entries = {s['name']: RemoteCatalogEntry(url=self.source_url,
                                                 getenv=self.getenv,
                                                 getshell=self.getshell,
                                                 auth=self.auth,
                                                 http_args=self.http_args, **s)
                   for s in info['sources']}

        return name, {}, entries, []
开发者ID:stonebig,项目名称:intake,代码行数:27,代码来源:base.py


示例5: _get_auth

    def _get_auth(self):
        parsed_url = urlparse(self.authentication_url)
        post_data = {
            'name': self.username,
            'password': self.password,
            'next': parsed_url.path + '?' + parsed_url.query
        }

        try:
            response = self.session.post(self.url, data=post_data,
                                         allow_redirects=False)
            response.raise_for_status()

            response = self.session.get(response.headers['location'],
                                        allow_redirects=False)
            response.raise_for_status()

            resulting_uri = '{redirect_uri}#access_token=(.*)'.format(
                redirect_uri=re.escape(self.redirect_uri))

            self.auth = re.search(resulting_uri, response.headers['location']).group(1)

        except Exception as error:
            helpers.handle_requests_exception(error)
            self.auth = None

        return self.auth
开发者ID:KraXed112,项目名称:SickRage,代码行数:27,代码来源:putio_client.py


示例6: get_filename_from_url

def get_filename_from_url(url):
    """Get a filename from a URL.

    >>> from planet.api import utils
    >>> urls = [
    ...     'https://planet.com/',
    ...     'https://planet.com/path/to/',
    ...     'https://planet.com/path/to/example.tif',
    ...     'https://planet.com/path/to/example.tif?foo=f6f1&bar=baz',
    ...     'https://planet.com/path/to/example.tif?foo=f6f1&bar=baz#quux'
    ... ]
    >>> for url in urls:
    ...     print('{} -> {}'.format(url, utils.get_filename_from_url(url)))
    ...
    https://planet.com/ -> None
    https://planet.com/path/to/ -> None
    https://planet.com/path/to/example.tif -> example.tif
    https://planet.com/path/to/example.tif?foo=f6f1&bar=baz -> example.tif
    https://planet.com/path/to/example.tif?foo=f6f1&bar=baz#quux -> example.tif
    >>>

    :returns: a filename (i.e. ``basename``)
    :rtype: str or None
    """
    path = urlparse(url).path
    name = path[path.rfind('/')+1:]
    return name or None
开发者ID:planetlabs,项目名称:planet-client-python,代码行数:27,代码来源:utils.py


示例7: request

def request(session, base_path, method, path, **kwargs):
    """Construct a :class:`requests.Request` object and send it.

    :param requests.Session session:
    :param str base_path:
    :param str method: Method for the :class:`requests.Request` object.
    :param str path: (optional) The path to join with :attr:`CouchDB.url`.
    :param kwargs: (optional) Arguments that :meth:`requests.Session.request` takes.
    :rtype: requests.Response
    """
    # Prepare the params dictionary
    if ('params' in kwargs) and isinstance(kwargs['params'], dict):
        params = kwargs['params'].copy()
        for key, val in iteritems(params):
            # Handle titlecase booleans
            if isinstance(val, bool):
                params[key] = json.dumps(val)
        kwargs['params'] = params

    if compat.urlparse(path).scheme:
        # Support absolute URLs
        url = path
    else:
        url = urljoin(base_path, path).strip('/')

    r = session.request(method, url, **kwargs)
    # Raise exception on a bad status code
    if not (200 <= r.status_code < 300):
        utils.raise_http_exception(r)

    return r
开发者ID:rwanyoike,项目名称:time2relax,代码行数:31,代码来源:time2relax.py


示例8: generate_request_header

    def generate_request_header(self, response):
        """
        Generates the GSSAPI authentication token with kerberos.

        If any GSSAPI step fails, return None.

        """
        host = urlparse(response.url).hostname
        peer_name = "{service}@{host}".format(service=self.service, host=host) # eg. [email protected]

        req_flags = (C_MUTUAL_FLAG,) if self.mutual_authentication in (REQUIRED, OPTIONAL) else ()

        try:
            self.context[host] = InitContext(peer_name=peer_name, req_flags=req_flags, cred=self.cred)
        except GSSException:
            # TODO is it even possible for InitContext() to raise?
            log.exception("generate_request_header(): InitContext() failed:")
            return None

        try:
            gss_response = self.context[host].step(_negotiate_value(response))
        except GSSException:
            log.exception("generate_request_header(): init_context.step() failed:")
            return None

        return "Negotiate {0}".format(gss_response)
开发者ID:japsu,项目名称:requests-gssapi,代码行数:26,代码来源:implementation.py


示例9: get_response

def get_response(name, request_kwargs):

    host = Host(request_kwargs['headers'].get('Host', None)
                or urlparse(request_kwargs['url']).netloc.split('@')[-1])

    session = Session(host, name)
    session.load()

    # Update session headers with the request headers.
    session['headers'].update(request_kwargs.get('headers', {}))
    # Use the merged headers for the request
    request_kwargs['headers'] = session['headers']

    auth = request_kwargs.get('auth', None)
    if auth:
        session.auth = auth
    elif session.auth:
        request_kwargs['auth'] = session.auth

    rsession = RSession(cookies=session.cookies)
    try:
        response = rsession.request(**request_kwargs)
    except Exception:
        raise
    else:
        session.cookies = rsession.cookies
        session.save()
        return response
开发者ID:simonbuchan,项目名称:httpie,代码行数:28,代码来源:sessions.py


示例10: get_tokens

    def get_tokens(cls, url, user_agent=None, **kwargs):
        scraper = cls.create_scraper()
        if user_agent:
            scraper.headers["User-Agent"] = user_agent

        try:
            resp = scraper.get(url, **kwargs)
            resp.raise_for_status()
        except Exception:
            logging.error("'%s' returned an error. Could not collect tokens." % url)
            raise

        domain = urlparse(resp.url).netloc
        cookie_domain = None

        for d in scraper.cookies.list_domains():
            if d.startswith(".") and d in ("." + domain):
                cookie_domain = d
                break
        else:
            raise ValueError(
                'Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?'
            )

        return (
            {
                "__cfduid": scraper.cookies.get("__cfduid", "", domain=cookie_domain),
                "cf_clearance": scraper.cookies.get(
                    "cf_clearance", "", domain=cookie_domain
                ),
            },
            scraper.headers["User-Agent"],
        )
开发者ID:Anorov,项目名称:cloudflare-scrape,代码行数:33,代码来源:__init__.py


示例11: get_response

def get_response(name, request_kwargs, read_only=False):
    """Like `client.get_response`, but applies permanent
    aspects of the session to the request.

    """
    host = Host(request_kwargs['headers'].get('Host', None)
                or urlparse(request_kwargs['url']).netloc.split('@')[-1])

    session = Session(host, name)
    session.load()

    # Update session headers with the request headers.
    session['headers'].update(request_kwargs.get('headers', {}))
    # Use the merged headers for the request
    request_kwargs['headers'] = session['headers']

    auth = request_kwargs.get('auth', None)
    if auth:
        session.auth = auth
    elif session.auth:
        request_kwargs['auth'] = session.auth

    rsession = requests.Session(cookies=session.cookies)
    try:
        response = rsession.request(**request_kwargs)
    except Exception:
        raise
    else:
        # Existing sessions with `read_only=True` don't get updated.
        if session.is_new or not read_only:
            session.cookies = rsession.cookies
            session.save()
        return response
开发者ID:bkvirendra,项目名称:httpie,代码行数:33,代码来源:sessions.py


示例12: authenticate_server

    def authenticate_server(self, response):
        """
        Uses GSSAPI to authenticate the server.

        Returns True on success, False on failure.
        """

        log.debug("authenticate_server(): Authenticate header: {0}".format(
            _negotiate_value(response)))

        host = urlparse(response.url).hostname

        try:
            result = kerberos.authGSSClientStep(self.context[host],
                                                _negotiate_value(response))
        except kerberos.GSSError as e:
            log.error("authenticate_server(): authGSSClientStep() failed:")
            log.exception(e)
            return False

        if result < 1:
            log.error("authenticate_server(): authGSSClientStep() failed: "
                      "{0}".format(result))
            return False

        log.debug("authenticate_server(): returning {0}".format(response))
        return True
开发者ID:vvanholl,项目名称:cassandra_snap_to_hadoop,代码行数:27,代码来源:kerberos_.py


示例13: authenticate_user

    def authenticate_user(self, response, **kwargs):
        """Handles user authentication with gssapi/kerberos"""

        host = urlparse(response.url).hostname

        try:
            auth_header = self.generate_request_header(response, host)
        except KerberosExchangeError:
            # GSS Failure, return existing response
            return response

        log.debug("authenticate_user(): Authorization header: {0}".format(
            auth_header))
        response.request.headers['Authorization'] = auth_header

        # Consume the content so we can reuse the connection for the next
        # request.
        response.content
        response.raw.release_conn()

        _r = response.connection.send(response.request, **kwargs)
        _r.history.append(response)

        log.debug("authenticate_user(): returning {0}".format(_r))
        return _r
开发者ID:FileTrek,项目名称:phoenix,代码行数:25,代码来源:kerberos_.py


示例14: __init__

    def __init__(self, count, url, cls, session, params=None, etag=None,
                 headers=None):
        models.GitHubCore.__init__(self, {}, session)
        #: Original number of items requested
        self.original = count
        #: Number of items left in the iterator
        self.count = count
        #: URL the class used to make it's first GET
        self.url = url
        #: Last URL that was requested
        self.last_url = None
        self._api = self.url
        #: Class for constructing an item to return
        self.cls = cls
        #: Parameters of the query string
        self.params = params or {}
        self._remove_none(self.params)
        # We do not set this from the parameter sent. We want this to
        # represent the ETag header returned by GitHub no matter what.
        # If this is not None, then it won't be set from the response and
        # that's not what we want.
        #: The ETag Header value returned by GitHub
        self.etag = None
        #: Headers generated for the GET request
        self.headers = headers or {}
        #: The last response seen
        self.last_response = None
        #: Last status code received
        self.last_status = 0

        if etag:
            self.headers.update({'If-None-Match': etag})

        self.path = urlparse(self.url).path
开发者ID:ArRolin,项目名称:gitsome,代码行数:34,代码来源:structs.py


示例15: get_spn

    def get_spn(self, r):
        if self.spn is None:
            domain = urlparse(r.url).hostname
            self.spn = spn = "[email protected]%s" % domain
            log.debug("calculated SPN as  %s" % spn)

        return self.spn
开发者ID:RogerWebb,项目名称:olap,代码行数:7,代码来源:requests_kerberosauth.py


示例16: from_request

def from_request(request):
    """Make an `HTTPMessage` from `requests.models.Request`."""
    url = urlparse(request.url)
    request_headers = dict(request.headers)
    if 'Host' not in request_headers:
        request_headers['Host'] = url.netloc

    try:
        body = request.data
    except AttributeError:
        # requests < 0.12.1
        body = request._enc_data

    if isinstance(body, dict):
        # --form
        body = request.__class__._encode_params(body)

    return HTTPMessage(
        line='{method} {path} HTTP/1.1'.format(
                method=request.method,
                path=url.path or '/'),
        headers='\n'.join(str('%s: %s') % (name, value)
                          for name, value
                          in request_headers.items()),
        body=body,
        content_type=request_headers.get('Content-Type')
    )
开发者ID:Bahus,项目名称:httpie,代码行数:27,代码来源:httpmessage.py


示例17: send

    def send(self, request, **kwargs):
        url = urlparse(request.url)
        if url.scheme != 'https':
            raise Exception('Only HTTPS is supported!')

        ctx = self._make_context()

        conn = httpslib.HTTPSConnection(
                url.hostname, url.port or 443, ssl_context=ctx)
        conn.request(request.method, url.path, request.body, request.headers)

        resp = conn.getresponse()
        response = Response()

        # Fallback to None if there's no status_code, for whatever reason.
        response.status_code = getattr(resp, 'status', None)

        # Make headers case-insensitive.
        response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))

        # Set encoding.
        response.encoding = get_encoding_from_headers(response.headers)
        response.raw = resp
        response.reason = response.raw.reason

        if isinstance(request.url, bytes):
            response.url = request.url.decode('utf-8')
        else:
            response.url = request.url

        # Give the Response some context.
        response.request = request
        response.connection = self

        return response
开发者ID:mcrute,项目名称:dev_urandom,代码行数:35,代码来源:pkcs11-adapter.py


示例18: parse_args

    def parse_args(self, env, args=None, namespace=None):

        self.env = env

        args = super(Parser, self).parse_args(args, namespace)

        if not args.json and env.config.implicit_content_type == 'form':
            args.form = True

        if args.debug:
            args.traceback = True

        if args.output:
            env.stdout = args.output
            env.stdout_isatty = False

        self._process_output_options(args, env)
        self._process_pretty_options(args, env)
        self._guess_method(args, env)
        self._parse_items(args)

        if not env.stdin_isatty:
            self._body_from_file(args, env.stdin)

        if not (args.url.startswith(HTTP) or args.url.startswith(HTTPS)):
            scheme = HTTPS if env.progname == 'https' else HTTP
            args.url = scheme + args.url

        if args.auth and not args.auth.has_password():
            # Stdin already read (if not a tty) so it's save to prompt.
            args.auth.prompt_password(urlparse(args.url).netloc)

        return args
开发者ID:hashin2,项目名称:cs499-django,代码行数:33,代码来源:input.py


示例19: reply

    def reply(self, url, special_reply_content=None):
        """
        :param special_reply_content: will use this reply instead of self.replies
        :return: if success return True, else return False
        """
        assert self.logged, 'Did not login successfully!'
        assert self._reply_contents or special_reply_content, 'No reply text!'

        resp = self.get(url)
        followup_value = self.pat_followup_value.search(resp.text).group(1).strip(r'\"')

        # 为什么不用parse_qs? 因为cc98 url上的boardid有些大写有些小写,不是统一的,
        # cc98 is too SB
        # 为什么不用lower降成小写?因为我写完上面那句话才想起来的,为了保留上面那句话
        # I is too SB
        qs_list = parse_qsl(urlparse(url).query)
        boardid = qs_list[0][1]
        rootid = qs_list[1][1]
        reply_url = self.REPLY_BASE_URL + '?' + urlencode((('method', 'fastreply'), ('BoardID', boardid)))
        cookies_password = parse_qs(resp.request.headers['cookie']).get('password')[0]
        post_reply = special_reply_content if special_reply_content else random.choice(self._reply_contents)

        post_form = {
            'followup':    followup_value,
            'RootID':      rootid,
            'star':        '1',
            'UserName':    self.username,
            'passwd':      cookies_password,
            'Expression':  'face7.gif',
            'Content':     post_reply,
            'signflag':    'yes',
        }
        self._reply_resp = self.post(reply_url, data=post_form)

        return self._reply_resp.ok
开发者ID:littlezz,项目名称:cc98,代码行数:35,代码来源:cc98.py


示例20: authenticate_server

    def authenticate_server(self, response):
        """
        Uses GSSAPI to authenticate the server.

        Returns True on success, False on failure.
        """

        log.debug("authenticate_server(): Authenticate header: {0}".format(
            _negotiate_value(response)))

        host = urlparse(response.url).hostname

        try:
            # If this is set pass along the struct to Kerberos
            if self.cbt_struct:
                result = kerberos.authGSSClientStep(self.context[host],
                                                    _negotiate_value(response),
                                                    channel_bindings=self.cbt_struct)
            else:
                result = kerberos.authGSSClientStep(self.context[host],
                                                    _negotiate_value(response))
        except kerberos.GSSError:
            log.exception("authenticate_server(): authGSSClientStep() failed:")
            return False

        if result < 1:
            log.error("authenticate_server(): authGSSClientStep() failed: "
                      "{0}".format(result))
            return False

        log.debug("authenticate_server(): returning {0}".format(response))
        return True
开发者ID:FileTrek,项目名称:phoenix,代码行数:32,代码来源:kerberos_.py



注:本文中的requests.compat.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python json.dumps函数代码示例发布时间:2022-05-26
下一篇:
Python compat.urljoin函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap