• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python urllib_parse.urlparse函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib_parse.urlparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlparse函数的具体用法?Python urlparse怎么用?Python urlparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_dcs

def parse_dcs(dcs):
    """
    Break up the provided dcs string
    >>> parse_dcs('localhost') == {'scheme': 'etcd', 'hostname': 'localhost', 'port': 4001}
    True
    >>> parse_dcs('localhost:8500') == {'scheme': 'consul', 'hostname': 'localhost', 'port': 8500}
    True
    >>> parse_dcs('zookeeper://localhost') == {'scheme': 'zookeeper', 'hostname': 'localhost', 'port': 2181}
    True
    """

    if not dcs:
        return {}

    parsed = urlparse(dcs)
    scheme = parsed.scheme
    if scheme == '' and parsed.netloc == '':
        parsed = urlparse('//' + dcs)

    if scheme == '':
        default_schemes = {'2181': 'zookeeper', '8500': 'consul'}
        scheme = default_schemes.get(str(parsed.port), 'etcd')

    port = parsed.port
    if port is None:
        default_ports = {'consul': 8500, 'zookeeper': 2181}
        port = default_ports.get(str(scheme), 4001)

    return {'scheme': str(scheme), 'hostname': str(parsed.hostname), 'port': int(port)}
开发者ID:jasonbrooks,项目名称:patroni-compose,代码行数:29,代码来源:ctl.py


示例2: from_docker_envvars

def from_docker_envvars(config):
    # linked postgres database (link name 'pg' or 'postgres')
    if 'PG_PORT' in os.environ:
        pg_url = urlparse(os.environ['PG_PORT'])

        if not pg_url.scheme == 'tcp':
            raise ValueError('Only tcp scheme supported for postgres')

        host, port = pg_url.netloc.split(':')

        uri = 'postgres://{user}:{password}@{host}:{port}/{database}'.format(
            user=os.environ.get('PG_ENV_POSTGRES_USER', 'postgres'),
            password=os.environ.get('PG_ENV_POSTGRES_PASSWORD', ''),
            host=host,
            port=port,
            database=os.environ.get('PG_ENV_POSTGRES_DB', 'postgres'))

        config['SQLALCHEMY_DATABASE_URI'] = uri

    if 'REDIS_PORT' in os.environ:
        redis_url = urlparse(os.environ['REDIS_PORT'])

        if not redis_url.scheme == 'tcp':
            raise ValueError('Only tcp scheme supported for redis')

        host, port = redis_url.netloc.split(':')

        uri = 'redis://{host}:{port}/0'.format(host=host, port=port, )

        config['REDIS_URL'] = uri
        config['REDIS_HOST'] = host
        config['REDIS_PORT'] = int(port)
开发者ID:bkabrda,项目名称:flask-appconfig,代码行数:32,代码来源:docker.py


示例3: _connect

 def _connect(self, url=None, cdn=False):
     if not url:
         if cdn:
             if not self.cdn_url:
                 self.auth()
             url = self.cdn_url
         else:
             if not self.storage_url:
                 self.auth()
             url = self.storage_url
     parsed = urlparse.urlparse(url) if url else None
     http_proxy_parsed = \
         urlparse.urlparse(self.http_proxy) if self.http_proxy else None
     if not parsed and not http_proxy_parsed:
         return None, None
     netloc = (http_proxy_parsed if self.http_proxy else parsed).netloc
     if parsed.scheme == 'http':
         self.verbose('Establishing HTTP connection to %s', netloc)
         conn = self.HTTPConnection(netloc)
     elif parsed.scheme == 'https':
         self.verbose('Establishing HTTPS connection to %s', netloc)
         conn = self.HTTPSConnection(netloc)
     else:
         raise self.HTTPException(
             'Cannot handle protocol scheme %s for url %s' %
             (parsed.scheme, repr(url)))
     if self.http_proxy:
         self.verbose(
             'Setting tunnelling to %s:%s', parsed.hostname, parsed.port)
         conn._set_tunnel(parsed.hostname, parsed.port)
     return parsed, conn
开发者ID:VadimPushtaev,项目名称:swiftly,代码行数:31,代码来源:standardclient.py


示例4: url_distance

def url_distance(preprocessor, url1, url2):
    url1 = urlparse(url1)
    url2 = urlparse(url2)

    process_fn = lambda s: preprocessor(unquote(s))
    path1 = map(process_fn, url1.path.strip('/').split('/'))
    path2 = map(process_fn, url2.path.strip('/').split('/'))
    path_distance = levenshtein_array(path1, path2)

    query_distance = dict_distance(preprocessor,
        parse_qs(url1.query, True),
        parse_qs(url2.query, True)
    )

    domain_distance = 4 * levenshtein_array(
        (url1.hostname or '').split('.'),
        (url2.hostname or '').split('.')
    )

    return (
        domain_distance +
        path_distance +
        query_distance +
        (url1.fragment != url2.fragment)
    )
开发者ID:Youwotma,项目名称:page_finder,代码行数:25,代码来源:url_distance.py


示例5: get_canonicalized_resource

    def get_canonicalized_resource(self, req, service):
        # /bucket/keyname
        parsed_req_path = urlparse.urlparse(req.url).path
        assert service.endpoint is not None
        parsed_svc_path = urlparse.urlparse(service.endpoint).path
        # IMPORTANT:  this only supports path-style requests
        assert parsed_req_path.startswith(parsed_svc_path)
        resource = parsed_req_path[len(parsed_svc_path):]
        if parsed_svc_path.endswith('/'):
            # The leading / got stripped off
            resource = '/' + resource
        if not resource:
            # This resource does not address a bucket
            resource = '/'

        # Now append sub-resources, a.k.a. query string parameters
        if getattr(req, 'params', None):
            # A regular Request
            params = req.params
        else:
            # A PreparedRequest
            params = _get_params_from_url(req.url)
        if params:
            subresources = []
            for key, val in sorted(params.iteritems()):
                if key in self.HASHED_PARAMS:
                    if val is None:
                        subresources.append(key)
                    else:
                        print '{0}={1}'.format(key, val), key + '=' + val
                        subresources.append(key + '=' + val)
                if subresources:
                    resource += '?' + '&'.join(subresources)
        self.log.debug('canonicalized resource: %s', repr(resource))
        return resource
开发者ID:castedo,项目名称:requestbuilder,代码行数:35,代码来源:aws.py


示例6: assert_urls_match

def assert_urls_match(url_a, url_b):
    url_a = urlparse(url_a)
    url_b = urlparse(url_b)

    assert url_a.scheme == url_b.scheme
    assert url_a.netloc == url_b.netloc
    assert url_a.path == url_b.path
    assert cgi.parse_qs(url_a.query) == cgi.parse_qs(url_b.query)
开发者ID:Poorvak,项目名称:imdb-pie,代码行数:8,代码来源:utils.py


示例7: assert_url_equal

def assert_url_equal(url, expected, compare_host=False):
    """Compare url paths and query strings."""
    parsed = urlparse(six.text_type(url))
    parsed_expected = urlparse(six.text_type(expected))
    compare_url_part(parsed.path, parsed_expected.path)
    compare_url_part(parse_qs(parsed.query), parse_qs(parsed_expected.query))
    if compare_host:
        compare_url_part(parsed.netloc, parsed_expected.netloc)
开发者ID:diox,项目名称:olympia,代码行数:8,代码来源:__init__.py


示例8: is_safe_url

def is_safe_url(target):
    """ Checks that the target url is safe and sending to the current
    website not some other malicious one.
    """
    ref_url = urlparse.urlparse(flask.request.host_url)
    test_url = urlparse.urlparse(
        urlparse.urljoin(flask.request.host_url, target))
    return test_url.scheme in ('http', 'https') and \
        ref_url.netloc == test_url.netloc
开发者ID:ncoghlan,项目名称:downstreaming,代码行数:9,代码来源:utils.py


示例9: _rebase_url

def _rebase_url(url, base):
    base = list(urlparse.urlparse(base))
    url = list(urlparse.urlparse(url))
    if not url[0]:  # fix up schema
        url[0] = base[0] or "http"
    if not url[1]:  # fix up hostname
        url[1] = base[1]
        if not url[2].startswith('/'):
            url[2] = re.sub(r'/[^/]+$', '/', base[2]) + url[2]
    return urlparse.urlunparse(url)
开发者ID:gergelypolonkai,项目名称:synapse,代码行数:10,代码来源:preview_url_resource.py


示例10: __init__

    def __init__(self, raw=None, _address=None):

        if raw:
            raw = _to_parser_input(raw)
            url = url_parser.parse(raw, lexer.clone())
            self._address = urlparse(url.address)
        elif _address:
            self._address = urlparse(_address)
        else:
            raise SyntaxError('failed to create UrlAddress: bad parameters')
开发者ID:mailgun,项目名称:flanker,代码行数:10,代码来源:address.py


示例11: do_start

 def do_start(self):
     start_url = self.backend.start().url
     target_url = self.auth_handlers(start_url)
     response = requests.get(start_url)
     self.assertEqual(response.url, target_url)
     self.assertEqual(response.text, 'foobar')
     self.strategy.set_request_data(parse_qs(urlparse(start_url).query),
                                    self.backend)
     self.strategy.set_request_data(parse_qs(urlparse(target_url).query),
                                    self.backend)
     return self.backend.complete()
开发者ID:BeatrizFerreira,项目名称:EP1DAS,代码行数:11,代码来源:oauth.py


示例12: add_uri

    def add_uri(self, uri, **kwargs):
        """

        .. WARNING::
            Deprecated, please use add_torrent.
        """
        if uri is None:
            raise ValueError('add_uri requires a URI.')
        # there has been some problem with T's built in torrent fetcher,
        # use a python one instead
        parsed_uri = urlparse(uri)
        torrent_data = None
        if parsed_uri.scheme in ['ftp', 'ftps', 'http', 'https']:
            torrent_file = urlopen(uri)
            torrent_data = torrent_file.read()
            torrent_data = base64.b64encode(torrent_data).decode('utf-8')
        if parsed_uri.scheme in ['file']:
            filepath = uri
            # uri decoded different on linux / windows ?
            if len(parsed_uri.path) > 0:
                filepath = parsed_uri.path
            elif len(parsed_uri.netloc) > 0:
                filepath = parsed_uri.netloc
            torrent_file = open(filepath, 'rb')
            torrent_data = torrent_file.read()
            torrent_data = base64.b64encode(torrent_data).decode('utf-8')
        warnings.warn('add_uri has been deprecated, please use add_torrent instead.', DeprecationWarning)
        if torrent_data:
            return self.add(torrent_data, **kwargs)
        else:
            return self.add(None, filename=uri, **kwargs)
开发者ID:agentxan,项目名称:nzbToMedia,代码行数:31,代码来源:client.py


示例13: post

 def post(self, queue_name, messages, client_uuid, project=None):
     """Send messages to the subscribers."""
     if self.subscription_controller:
         if not isinstance(self.subscription_controller,
                           pooling.SubscriptionController):
             marker = None
             while True:
                 subscribers = self.subscription_controller.list(
                     queue_name, project, marker=marker)
                 for sub in next(subscribers):
                     LOG.debug("Notifying subscriber %r" % (sub,))
                     s_type = urllib_parse.urlparse(
                         sub['subscriber']).scheme
                     # If the subscriber doesn't contain 'confirmed', it
                     # means that this kind of subscriber was created before
                     # the confirm feature be introduced into Zaqar. We
                     # should allow them be subscribed.
                     if (self.require_confirmation and
                             not sub.get('confirmed', True)):
                         LOG.info(_LI('The subscriber %s is not '
                                      'confirmed.'), sub['subscriber'])
                         continue
                     for msg in messages:
                         msg['Message_Type'] = MessageType.Notification.name
                     self._execute(s_type, sub, messages)
                 marker = next(subscribers)
                 if not marker:
                     break
     else:
         LOG.error(_LE('Failed to get subscription controller.'))
开发者ID:ollie314,项目名称:zaqar,代码行数:30,代码来源:notifier.py


示例14: __new__

 def __new__(cls, urlstring):
     if isinstance(urlstring, binary_type):
         # here we make a safe-ish assumption it is  a utf-8 string
         urlstring = urlstring.decode('utf-8')
     url = super(URL, cls).__new__(cls, urlstring)
     url.parsed = urlparse(url)
     return url
开发者ID:pombredanne,项目名称:wextracto,代码行数:7,代码来源:url.py


示例15: __call__

    def __call__(self, value):
        parsed = urlparse(value)
        if parsed.scheme not in ("http", "https"):
            raise ValidationError(message=self.message)

        if parsed.hostname in ("127.0.0.1", "localhost"):
            raise ValidationError(message=self.message)
开发者ID:haswalt,项目名称:healthchecks,代码行数:7,代码来源:validators.py


示例16: get_agent

def get_agent(reactor, connect_timeout=None):
    """Return appropriate agent based on whether an http_proxy is used or not.

    :param connect_timeout: connection timeout in seconds
    :type connect_timeout: float
    :returns: :class:`twisted.web.client.ProxyAgent` when an http_proxy
        environment variable is present, :class:`twisted.web.client.Agent`
        otherwise.
    """

    # TODO: Would be nice to have https_proxy support too.
    http_proxy = os.environ.get('http_proxy')
    if http_proxy is None:
        return _twisted_web_client().Agent(
            reactor,
            connectTimeout=connect_timeout)

    parse_result = urlparse(http_proxy)
    http_proxy_endpoint = TCP4ClientEndpoint(
        reactor,
        parse_result.hostname,
        parse_result.port or 80,
        timeout=connect_timeout)

    return _twisted_web_client().ProxyAgent(http_proxy_endpoint)
开发者ID:dannyperson,项目名称:fido,代码行数:25,代码来源:fido.py


示例17: parse_url

 def parse_url(cls, url):
     parsed = urlparse(url)
     return cls(proxy_type=parsed.scheme,
                proxy_address=parsed.hostname,
                proxy_port=parsed.port,
                proxy_login=parsed.username,
                proxy_password=parsed.password)
开发者ID:pannal,项目名称:Sub-Zero.bundle,代码行数:7,代码来源:proxy.py


示例18: validate_image_proxies

def validate_image_proxies(node):
    """Check that the provided proxy parameters are valid.

    :param node: an Ironic node.
    :raises: InvalidParameterValue if any of the provided proxy parameters are
        incorrect.
    """
    invalid_proxies = {}
    for scheme in ('http', 'https'):
        proxy_param = 'image_%s_proxy' % scheme
        proxy = node.driver_info.get(proxy_param)
        if proxy:
            chunks = urlparse.urlparse(proxy)
            # NOTE(vdrok) If no scheme specified, this is still a valid
            # proxy address. It is also possible for a proxy to have a
            # scheme different from the one specified in the image URL,
            # e.g. it is possible to use https:// proxy for downloading
            # http:// image.
            if chunks.scheme not in ('', 'http', 'https'):
                invalid_proxies[proxy_param] = proxy
    msg = ''
    if invalid_proxies:
        msg += _("Proxy URL should either have HTTP(S) scheme "
                 "or no scheme at all, the following URLs are "
                 "invalid: %s.") % invalid_proxies
    no_proxy = node.driver_info.get('image_no_proxy')
    if no_proxy is not None and not utils.is_valid_no_proxy(no_proxy):
        msg += _(
            "image_no_proxy should be a list of host names, IP addresses "
            "or domain names to exclude from proxying, the specified list "
            "%s is incorrect. To denote a domain name, prefix it with a dot "
            "(instead of e.g. '.*').") % no_proxy
    if msg:
        raise exception.InvalidParameterValue(msg)
开发者ID:mnarusze,项目名称:ironic,代码行数:34,代码来源:agent.py


示例19: _discover_auth_versions

def _discover_auth_versions(session, auth_url):
    # discover the API versions the server is supporting base on the
    # given URL
    v2_auth_url = None
    v3_auth_url = None
    try:
        ks_discover = discover.Discover(session=session, auth_url=auth_url)
        v2_auth_url = ks_discover.url_for('2.0')
        v3_auth_url = ks_discover.url_for('3.0')
    except ks_exc.ClientException:
        # Identity service may not support discover API version.
        # Lets trying to figure out the API version from the original URL.
        path = urlparse.urlparse(auth_url).path
        path = path.lower()
        if path.startswith('/v3'):
            v3_auth_url = auth_url
        elif path.startswith('/v2'):
            v2_auth_url = auth_url
        else:
            # not enough information to determine the auth version
            msg = ('Unable to determine the Keystone version '
                   'to authenticate with using the given '
                   'auth_url. Identity service may not support API '
                   'version discovery. Please provide a versioned '
                   'auth_url instead.')
            raise exc.CommandError(msg)

    return v2_auth_url, v3_auth_url
开发者ID:micumatei,项目名称:cloudbase-init-ci,代码行数:28,代码来源:client.py


示例20: depaginate

def depaginate(api, result):
    """Depaginate the first (or only) page of a paginated result"""
    meta = result['meta']
    if meta['next'] is None:
        # No pages means we're done
        return result

    # make a copy of meta that we'll mess with and eventually return
    # since we'll be chewing on the 'meta' object with every new GET
    # same thing for objects, since we'll just be appending to it
    # while we pull more records
    ret_meta = meta.copy()
    ret_objects = result['objects']
    while meta['next']:
        # parse out url bits for constructing the new api req
        next_url = urlparse(meta['next'])
        # ugh...need to find the word after 'api/' in the next URL to
        # get the resource endpoint name; not sure how to make this better
        next_endpoint = next_url.path.strip('/').split('/')[-1]
        next_params = {k: v[0] for k, v in parse_qs(next_url.query).items()}
        result = getattr(api, next_endpoint).get(**next_params)
        ret_objects.extend(result['objects'])
        meta = result['meta']

    # fix meta up to not tell lies
    ret_meta['total_count'] = len(ret_objects)
    ret_meta['next'] = None
    ret_meta['limit'] = ret_meta['total_count']
    return {
        'meta': ret_meta,
        'objects': ret_objects
    }
开发者ID:lcouzens,项目名称:cfme_tests,代码行数:32,代码来源:trackerbot.py



注:本文中的six.moves.urllib_parse.urlparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python urllib_parse.urlsplit函数代码示例发布时间:2022-05-27
下一篇:
Python urllib_parse.urljoin函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap