本文整理汇总了Python中six.moves.urllib_parse.urlunparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlunparse函数的具体用法?Python urlunparse怎么用?Python urlunparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlunparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _remove_params_from_url
def _remove_params_from_url(url):
"""
Return a copy of a URL with its parameters, fragments, and query
string removed.
"""
parsed = urlparse.urlparse(url)
return urlparse.urlunparse((parsed[0], parsed[1], parsed[2], '', '', ''))
开发者ID:castedo,项目名称:requestbuilder,代码行数:7,代码来源:aws.py
示例2: __init__
def __init__(self, value):
if not isinstance(value, six.text_type):
raise TypeError("value must be a unicode string")
parsed = urllib_parse.urlparse(value)
if not parsed.hostname:
netloc = ""
elif parsed.port:
netloc = (
idna.encode(parsed.hostname) +
":{0}".format(parsed.port).encode("ascii")
).decode("ascii")
else:
netloc = idna.encode(parsed.hostname).decode("ascii")
# Note that building a URL in this fashion means it should be
# semantically indistinguishable from the original but is not
# guaranteed to be exactly the same.
uri = urllib_parse.urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
)).encode("ascii")
self._value = value
self._encoded = uri
开发者ID:sotte,项目名称:cryptography,代码行数:29,代码来源:x509.py
示例3: get_sample_data
def get_sample_data(self, meter_name, parse_url, params, cache):
extractor = self._get_extractor(meter_name)
if extractor is None:
# The way to getting meter is not implemented in this driver or
# OpenDaylight REST API has not api to getting meter.
return None
iter = self._get_iter(meter_name)
if iter is None:
# The way to getting meter is not implemented in this driver or
# OpenDaylight REST API has not api to getting meter.
return None
parts = urlparse.ParseResult(params.get('scheme', ['http'])[0],
parse_url.netloc,
parse_url.path,
None,
None,
None)
endpoint = urlparse.urlunparse(parts)
data = self._prepare_cache(endpoint, params, cache)
samples = []
if data:
for sample in iter(extractor, data):
if sample is not None:
# set controller name to resource_metadata
sample[2]['controller'] = 'OpenDaylight_V2'
samples.append(sample)
return samples
开发者ID:openstack,项目名称:networking-odl,代码行数:33,代码来源:driver.py
示例4: _build_network_relative_path
def _build_network_relative_path(url):
p = urlparse.urlparse(url)
parse_result = urlparse.ParseResult(
p.scheme,
p.netloc,
os.path.dirname(p.path),
'', '', '')
return urlparse.urlunparse(parse_result)
开发者ID:nfredrik,项目名称:pyraml-parser,代码行数:8,代码来源:parser.py
示例5: url_add_parameters
def url_add_parameters(url, params):
"""Adds parameters to URL, parameter will be repeated if already present"""
if params:
fragments = list(urlparse(url))
value = parse_qs(fragments[4])
value.update(params)
fragments[4] = urlencode(value)
url = urlunparse(fragments)
return url
开发者ID:Finc3,项目名称:social-core,代码行数:9,代码来源:utils.py
示例6: _auth1
def _auth1(self):
status = 0
reason = 'Unknown'
attempt = 0
while attempt < self.attempts:
attempt += 1
self.verbose('Attempting auth v1 with %s', self.auth_url)
parsed, conn = self._connect(self.auth_url)
self.verbose('> GET %s', parsed.path)
conn.request(
'GET', parsed.path, '',
{'User-Agent': self.user_agent,
'X-Auth-User': quote(self.auth_user),
'X-Auth-Key': quote(self.auth_key)})
try:
resp = conn.getresponse()
status = resp.status
reason = resp.reason
self.verbose('< %s %s', status, reason)
hdrs = headers_to_dict(resp.getheaders())
resp.read()
resp.close()
conn.close()
except Exception as err:
status = 0
reason = str(err)
hdrs = {}
if status == 401:
break
if status // 100 == 2:
try:
self.storage_url = hdrs['x-storage-url']
except KeyError:
status = 0
reason = 'No x-storage-url header'
break
if self.snet:
parsed = list(urlparse.urlparse(self.storage_url))
# Second item in the list is the netloc
parsed[1] = 'snet-' + parsed[1]
self.storage_url = urlparse.urlunparse(parsed)
self.cdn_url = hdrs.get('x-cdn-management-url')
self.auth_token = hdrs.get('x-auth-token')
if not self.auth_token:
self.auth_token = hdrs.get('x-storage-token')
if not self.auth_token:
status = 500
reason = (
'No x-auth-token or x-storage-token header in '
'response')
break
self._auth_save_cache()
break
elif status // 100 != 5:
break
self.client.sleep(2 ** attempt)
return status, reason
开发者ID:VadimPushtaev,项目名称:swiftly,代码行数:57,代码来源:standardclient.py
示例7: __init__
def __init__(self, user, password, session):
self.user = user
self.password = password
self.session = session
self.cookies = {}
self.login_url = urllib_parse.urlunparse(
["https", API_DOMAIN, "/login", "", "", ""],
)
开发者ID:dstufft,项目名称:fastly-py,代码行数:9,代码来源:auth.py
示例8: _next_server
def _next_server(self, cause=None):
while True:
url = super(Client, self)._next_server(cause)
r = urlparse(url)
ips = self._dns_resolver.resolve(r.hostname)
if ips:
netloc = '{0}:{1}'.format(random.choice(ips), r.port)
self._base_uri_unresolved = url
return urlunparse((r.scheme, netloc, r.path, r.params, r.query, r.fragment))
开发者ID:zalando,项目名称:patroni,代码行数:9,代码来源:etcd.py
示例9: _rebase_url
def _rebase_url(url, base):
base = list(urlparse.urlparse(base))
url = list(urlparse.urlparse(url))
if not url[0]: # fix up schema
url[0] = base[0] or "http"
if not url[1]: # fix up hostname
url[1] = base[1]
if not url[2].startswith('/'):
url[2] = re.sub(r'/[^/]+$', '/', base[2]) + url[2]
return urlparse.urlunparse(url)
开发者ID:gergelypolonkai,项目名称:synapse,代码行数:10,代码来源:preview_url_resource.py
示例10: parse_connection_string
def parse_connection_string(value):
"""Original Governor stores connection strings for each cluster members if a following format:
postgres://{username}:{password}@{connect_address}/postgres
Since each of our patroni instances provides own REST API endpoint it's good to store this information
in DCS among with postgresql connection string. In order to not introduce new keys and be compatible with
original Governor we decided to extend original connection string in a following way:
postgres://{username}:{password}@{connect_address}/postgres?application_name={api_url}
This way original Governor could use such connection string as it is, because of feature of `libpq` library.
This method is able to split connection string stored in DCS into two parts, `conn_url` and `api_url`"""
scheme, netloc, path, params, query, fragment = urlparse(value)
conn_url = urlunparse((scheme, netloc, path, params, '', fragment))
api_url = ([v for n, v in parse_qsl(query) if n == 'application_name'] or [None])[0]
return conn_url, api_url
开发者ID:sean-,项目名称:patroni,代码行数:15,代码来源:dcs.py
示例11: _build_general_name
def _build_general_name(backend, gn):
if gn.type == backend._lib.GEN_DNS:
data = backend._ffi.buffer(gn.d.dNSName.data, gn.d.dNSName.length)[:]
return x509.DNSName(idna.decode(data))
elif gn.type == backend._lib.GEN_URI:
data = backend._ffi.buffer(
gn.d.uniformResourceIdentifier.data,
gn.d.uniformResourceIdentifier.length
)[:].decode("ascii")
parsed = urllib_parse.urlparse(data)
hostname = idna.decode(parsed.hostname)
if parsed.port:
netloc = hostname + u":" + six.text_type(parsed.port)
else:
netloc = hostname
# Note that building a URL in this fashion means it should be
# semantically indistinguishable from the original but is not
# guaranteed to be exactly the same.
uri = urllib_parse.urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
))
return x509.UniformResourceIdentifier(uri)
elif gn.type == backend._lib.GEN_RID:
oid = _obj2txt(backend, gn.d.registeredID)
return x509.RegisteredID(x509.ObjectIdentifier(oid))
elif gn.type == backend._lib.GEN_IPADD:
return x509.IPAddress(
ipaddress.ip_address(
backend._ffi.buffer(
gn.d.iPAddress.data, gn.d.iPAddress.length
)[:]
)
)
else:
# otherName, x400Address or ediPartyName
raise x509.UnsupportedGeneralNameType(
"{0} is not a supported type".format(
x509._GENERAL_NAMES.get(gn.type, gn.type)
),
gn.type
)
开发者ID:endreszabo,项目名称:cryptography,代码行数:47,代码来源:x509.py
示例12: url_replace
def url_replace(url, **params):
""" Replace some named parts in an url; See `urlparse.ParseResult` for the names """
url_fields = urlparse.ParseResult._fields
name_to_num = {field: idx for idx, field in enumerate(url_fields)}
url_parts = list(urlparse.urlparse(url)) # Need a copy anyway
for key, val in params.items():
# Allow supplying various stuff as a query
if key == 'query' and not isinstance(val, (bytes, unicode)):
if isinstance(val, dict):
val = val.items()
val = [(to_bytes(query_key), to_bytes(query_val))
for query_key, query_val in val]
val = urlencode(val)
num = name_to_num[key] # Will except here if supplied an unknown url param
url_parts[num] = val
return urlparse.urlunparse(url_parts)
开发者ID:HoverHell,项目名称:pyaux,代码行数:18,代码来源:urlhelpers.py
示例13: info
def info(self):
info = {
"active": self.active(),
"expires": self._expires,
"idle_time": self.idle_time,
"ops": list(self._ops),
"size": self._size,
"sparse": self._sparse,
"timeout": self._timeout,
"url": urllib_parse.urlunparse(self._url),
"uuid": self._uuid,
}
if self._transfer_id:
info["transfer_id"] = self._transfer_id
if self.filename:
info["filename"] = self.filename
transferred = self.transferred()
if transferred is not None:
info["transferred"] = transferred
return info
开发者ID:oVirt,项目名称:ovirt-imageio,代码行数:20,代码来源:auth.py
示例14: _idna_encode
def _idna_encode(self, value):
parsed = urllib_parse.urlparse(value)
if parsed.port:
netloc = (
idna.encode(parsed.hostname) +
":{0}".format(parsed.port).encode("ascii")
).decode("ascii")
else:
netloc = idna.encode(parsed.hostname).decode("ascii")
# Note that building a URL in this fashion means it should be
# semantically indistinguishable from the original but is not
# guaranteed to be exactly the same.
return urllib_parse.urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
))
开发者ID:alex,项目名称:cryptography,代码行数:21,代码来源:general_name.py
示例15: strip_netloc
def strip_netloc(url):
"""
Strip the scheme and host from the URL, returning the
server-absolute portion.
Useful for wrapping an absolute-URI for which only the
path is expected (such as in calls to getPage).
>>> strip_netloc('https://google.com/foo/bar?bing#baz')
'/foo/bar?bing'
>>> strip_netloc('//google.com/foo/bar?bing#baz')
'/foo/bar?bing'
>>> strip_netloc('/foo/bar?bing#baz')
'/foo/bar?bing'
"""
parsed = urllib_parse.urlparse(url)
scheme, netloc, path, params, query, fragment = parsed
stripped = '', '', path, params, query, ''
return urllib_parse.urlunparse(stripped)
开发者ID:Southpaw-TACTIC,项目名称:TACTIC,代码行数:21,代码来源:webtest.py
示例16: __repr__
def __repr__(self):
return ("<Ticket "
"active={active!r} "
"expires={self.expires!r} "
"filename={self.filename!r} "
"idle_time={self.idle_time} "
"ops={self.ops!r} "
"size={self.size!r} "
"sparse={self.sparse!r} "
"transfer_id={self.transfer_id!r} "
"transferred={transferred!r} "
"url={url!r} "
"uuid={self.uuid!r} "
"at {addr:#x}>"
).format(
active=self.active(),
addr=id(self),
self=self,
transferred=self.transferred(),
url=urllib_parse.urlunparse(self.url)
)
开发者ID:oVirt,项目名称:ovirt-imageio,代码行数:21,代码来源:auth.py
示例17: delete_account
def delete_account(user):
"""
Terminate account view.
It receives a POST request, checks the csrf token,
schedules the account termination action,
and redirects to the IdP.
"""
current_app.logger.debug('Initiating account termination for user {}'.format(user))
ts_url = current_app.config.get('TOKEN_SERVICE_URL')
terminate_url = urlappend(ts_url, 'terminate')
next_url = url_for('security.account_terminated')
params = {'next': next_url}
url_parts = list(urlparse(terminate_url))
query = parse_qs(url_parts[4])
query.update(params)
url_parts[4] = urlencode(query)
location = urlunparse(url_parts)
return RedirectSchema().dump({'location': location}).data
开发者ID:SUNET,项目名称:eduid-webapp,代码行数:22,代码来源:security.py
示例18: search_url_config
def search_url_config(cls, url, engine=None):
config = {}
url = urlparse.urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
# Remove query strings.
path = url.path[1:]
path = path.split('?', 2)[0]
if url.scheme in cls.SEARCH_SCHEMES:
config["ENGINE"] = cls.SEARCH_SCHEMES[url.scheme]
if path.endswith("/"):
path = path[:-1]
split = path.rsplit("/", 1)
if len(split) > 1:
path = split[:-1]
index = split[-1]
else:
path = ""
index = split[0]
config.update({
"URL": urlparse.urlunparse(("http",) + url[1:2] + (path,) + url[3:]),
"INDEX_NAME": index,
})
if path:
config.update({
"PATH": path,
})
if engine:
config['ENGINE'] = engine
return config
开发者ID:ei-grad,项目名称:django-environ,代码行数:38,代码来源:environ.py
示例19: send_request
def send_request(self, method='GET', path=None, params=None, headers=None,
data=None, files=None, auth=None):
url = self.__get_url_for_path(path)
headers = dict(headers)
if 'host' not in [header.lower() for header in headers]:
headers['Host'] = urlparse.urlparse(self.endpoint).netloc
try:
max_tries = self.max_retries + 1
assert max_tries >= 1
redirects_left = 5
if isinstance(data, file) and hasattr(data, 'seek'):
# If we're redirected we need to be able to reset
data_file_offset = data.tell()
else:
data_file_offset = None
while True:
for attempt_no, delay in enumerate(
_generate_delays(max_tries), 1):
# Use exponential backoff if this is a retry
if delay > 0:
self.log.debug('will retry after %.3f seconds', delay)
time.sleep(delay)
self.log.info('sending request (attempt %i of %i)',
attempt_no, max_tries)
p_request = self.__log_and_prepare_request(
method, url, params, data, files, headers, auth)
p_request.start_time = datetime.datetime.now()
proxies = requests.utils.get_environ_proxies(url)
for key, val in sorted(proxies.items()):
self.log.debug('request proxy: %s=%s', key, val)
try:
response = self.session.send(
p_request, timeout=self.timeout, proxies=proxies,
allow_redirects=False)
except requests.exceptions.Timeout:
if attempt_no < max_tries:
self.log.debug('timeout', exc_info=True)
if data_file_offset is not None:
self.log.debug('re-seeking body to '
'beginning of file')
# pylint: disable=E1101
data.seek(data_file_offset)
# pylint: enable=E1101
continue
elif not hasattr(data, 'tell'):
continue
# Fallthrough -- if it has a file pointer but not
# seek we can't retry because we can't rewind.
raise
if response.status_code not in (500, 503):
break
# If it *was* in that list, retry
if (response.status_code in (301, 302, 307, 308) and
redirects_left > 0 and 'Location' in response.headers):
# Standard redirect -- we need to handle this ourselves
# because we have to re-sign requests when their URLs
# change.
redirects_left -= 1
parsed_rdr = urlparse.urlparse(
response.headers['Location'])
parsed_url = urlparse.urlparse(url)
new_url_bits = []
for rdr_bit, url_bit in zip(parsed_rdr, parsed_url):
new_url_bits.append(rdr_bit or url_bit)
if 'Host' in headers:
headers['Host'] = new_url_bits[1] # netloc
url = urlparse.urlunparse(new_url_bits)
self.log.debug('redirecting to %s (%i redirect(s) '
'remaining)', url, redirects_left)
if data_file_offset is not None:
self.log.debug('re-seeking body to beginning of file')
# pylint: disable=E1101
data.seek(data_file_offset)
# pylint: enable=E1101
continue
elif response.status_code >= 300:
# We include 30x because we've handled the standard method
# of redirecting, but the server might still be trying to
# redirect another way for some reason.
self.handle_http_error(response)
return response
except requests.exceptions.Timeout as exc:
self.log.debug('timeout', exc_info=True)
raise TimeoutError('request timed out', exc)
except requests.exceptions.ConnectionError as exc:
self.log.debug('connection error', exc_info=True)
return self.__handle_connection_error(exc)
except requests.exceptions.HTTPError as exc:
return self.handle_http_error(response)
except requests.exceptions.RequestException as exc:
self.log.debug('request error', exc_info=True)
raise ClientError(exc)
开发者ID:castedo,项目名称:requestbuilder,代码行数:94,代码来源:service.py
示例20: _decode_general_name
def _decode_general_name(backend, gn):
if gn.type == backend._lib.GEN_DNS:
data = backend._ffi.buffer(gn.d.dNSName.data, gn.d.dNSName.length)[:]
if data.startswith(b"*."):
# This is a wildcard name. We need to remove the leading wildcard,
# IDNA decode, then re-add the wildcard. Wildcard characters should
# always be left-most (RFC 2595 section 2.4).
data = u"*." + idna.decode(data[2:])
else:
# Not a wildcard, decode away. If the string has a * in it anywhere
# invalid this will raise an InvalidCodePoint
data = idna.decode(data)
return x509.DNSName(data)
elif gn.type == backend._lib.GEN_URI:
data = backend._ffi.buffer(
gn.d.uniformResourceIdentifier.data,
gn.d.uniformResourceIdentifier.length
)[:].decode("ascii")
parsed = urllib_parse.urlparse(data)
hostname = idna.decode(parsed.hostname)
if parsed.port:
netloc = hostname + u":" + six.text_type(parsed.port)
else:
netloc = hostname
# Note that building a URL in this fashion means it should be
# semantically indistinguishable from the original but is not
# guaranteed to be exactly the same.
uri = urllib_parse.urlunparse((
parsed.scheme,
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
))
return x509.UniformResourceIdentifier(uri)
elif gn.type == backend._lib.GEN_RID:
oid = _obj2txt(backend, gn.d.registeredID)
return x509.RegisteredID(x509.ObjectIdentifier(oid))
elif gn.type == backend._lib.GEN_IPADD:
return x509.IPAddress(
ipaddress.ip_address(
backend._ffi.buffer(
gn.d.iPAddress.data, gn.d.iPAddress.length
)[:]
)
)
elif gn.type == backend._lib.GEN_DIRNAME:
return x509.DirectoryName(
_decode_x509_name(backend, gn.d.directoryName)
)
elif gn.type == backend._lib.GEN_EMAIL:
data = backend._ffi.buffer(
gn.d.rfc822Name.data, gn.d.rfc822Name.length
)[:].decode("ascii")
name, address = parseaddr(data)
parts = address.split(u"@")
if name or len(parts) > 2 or not address:
# parseaddr has found a name (e.g. Name <email>) or the split
# has found more than 2 parts (which means more than one @ sign)
# or the entire value is an empty string.
raise ValueError("Invalid rfc822name value")
elif len(parts) == 1:
# Single label email name. This is valid for local delivery. No
# IDNA decoding can be done since there is no domain component.
return x509.RFC822Name(address)
else:
# A normal email of the form [email protected] Let's attempt to
# decode the domain component and return the entire address.
return x509.RFC822Name(
parts[0] + u"@" + idna.decode(parts[1])
)
else:
# otherName, x400Address or ediPartyName
raise x509.UnsupportedGeneralNameType(
"{0} is not a supported type".format(
x509._GENERAL_NAMES.get(gn.type, gn.type)
),
gn.type
)
开发者ID:sigmavirus24,项目名称:cryptography,代码行数:82,代码来源:x509.py
注:本文中的six.moves.urllib_parse.urlunparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论