本文整理汇总了Python中six.moves.urllib.parse.urlunparse函数的典型用法代码示例。如果您正苦于以下问题:Python urlunparse函数的具体用法?Python urlunparse怎么用?Python urlunparse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlunparse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _secure_request
def _secure_request(self, url, method, data=None, files=None, headers=None,
raw=False, send_as_json=True, content_type=None,
**request_kwargs):
full_url = self.build_url(url)
# Add token (if it's not already there)
if self._token:
parsed = list(urlparse(full_url))
if not parsed[4]: # query
parsed[4] = 'token=%s' % self._token
full_url = urlunparse(parsed)
elif 'token' not in parse_qs(parsed[4]):
parsed[4] += '&token=%s' % self._token
full_url = urlunparse(parsed)
headers = headers or {}
# If files are being sent, we cannot encode data as JSON
if send_as_json and not files:
headers['content-type'] = 'application/json'
data = json.dumps(data or {})
else:
if content_type:
headers['content-type'] = content_type
data = data or ''
method = getattr(requests, method, None)
response = method(full_url, data=data, files=files, headers=headers,
**request_kwargs)
self.check_for_errors(response) # Raise exception if something failed
if raw or not response.content:
return response.content
return json.loads(response.text)
开发者ID:amigocloud,项目名称:python-amigocloud,代码行数:34,代码来源:amigocloud.py
示例2: get_canonical_and_alternates_urls
def get_canonical_and_alternates_urls(url, drop_ln=True, washed_argd=None, quote_path=False):
"""
Given an Invenio URL returns a tuple with two elements. The first is the
canonical URL, that is the original URL with CFG_SITE_URL prefix, and
where the ln= argument stripped. The second element element is mapping,
language code -> alternate URL
@param quote_path: if True, the path section of the given C{url}
is quoted according to RFC 2396
"""
dummy_scheme, dummy_netloc, path, dummy_params, query, fragment = urlparse(url)
canonical_scheme, canonical_netloc = urlparse(cfg.get("CFG_SITE_URL"))[0:2]
parsed_query = washed_argd or parse_qsl(query)
no_ln_parsed_query = [(key, value) for (key, value) in parsed_query if key != "ln"]
if drop_ln:
canonical_parsed_query = no_ln_parsed_query
else:
canonical_parsed_query = parsed_query
if quote_path:
path = urllib.quote(path)
canonical_query = urlencode(canonical_parsed_query)
canonical_url = urlunparse((canonical_scheme, canonical_netloc, path, dummy_params, canonical_query, fragment))
alternate_urls = {}
for ln in cfg.get("CFG_SITE_LANGS"):
alternate_query = urlencode(no_ln_parsed_query + [("ln", ln)])
alternate_url = urlunparse((canonical_scheme, canonical_netloc, path, dummy_params, alternate_query, fragment))
alternate_urls[ln] = alternate_url
return canonical_url, alternate_urls
开发者ID:jirikuncar,项目名称:invenio-utils,代码行数:28,代码来源:url.py
示例3: get_commit_url
def get_commit_url(commit, pkg):
try:
upstream_url = parse.urlsplit(pkg["upstream"])
if upstream_url.netloc == "git.openstack.org":
commit_url = ("http",
upstream_url.netloc,
"/cgit%s/commit/?id=" % upstream_url.path,
"", "", "")
commit_url = parse.urlunparse(commit_url)
elif upstream_url.netloc == "github.com":
commit_url = ("https",
upstream_url.netloc,
"%s/commit/" % upstream_url.path,
"", "", "")
commit_url = parse.urlunparse(commit_url)
else:
# Fallback when no cgit URL can be defined
commit_url = pkg["upstream"]
except KeyError:
# This should not happen, but pkg['upstream'] may not be present
# after some error in the gitrepo driver
commit_url = ''
return commit_url
开发者ID:openstack-packages,项目名称:DLRN,代码行数:25,代码来源:reporting.py
示例4: base_url
def base_url(self, filters, auth_data=None):
"""Base URL from catalog
Filters can be:
- service: compute, image, etc
- region: the service region
- endpoint_type: adminURL, publicURL, internalURL
- api_version: replace catalog version with this
- skip_path: take just the base URL
"""
if auth_data is None:
auth_data = self.auth_data
token, _auth_data = auth_data
service = filters.get('service')
region = filters.get('region')
endpoint_type = filters.get('endpoint_type', 'publicURL')
if service is None:
raise exceptions.EndpointNotFound("No service provided")
_base_url = None
for ep in _auth_data['serviceCatalog']:
if ep["type"] == service:
for _ep in ep['endpoints']:
if region is not None and _ep['region'] == region:
_base_url = _ep.get(endpoint_type)
if not _base_url:
# No region matching, use the first
_base_url = ep['endpoints'][0].get(endpoint_type)
break
if _base_url is None:
raise exceptions.EndpointNotFound(
"service: %s, region: %s, endpoint_type: %s" %
(service, region, endpoint_type))
parts = urlparse.urlparse(_base_url)
if filters.get('api_version', None) is not None:
version_path = '/%s' % filters['api_version']
path = re.sub(r'(^|/)+v\d+(?:\.\d+)?',
version_path,
parts.path,
count=1)
_base_url = urlparse.urlunparse((parts.scheme,
parts.netloc,
path or version_path,
parts.params,
parts.query,
parts.fragment))
if filters.get('skip_path', None) is not None and parts.path != '':
_base_url = urlparse.urlunparse((parts.scheme,
parts.netloc,
'/',
parts.params,
parts.query,
parts.fragment))
return _base_url
开发者ID:cisco-openstack,项目名称:tempest,代码行数:57,代码来源:auth.py
示例5: output
def output(self, key, obj):
try:
data = to_marshallable_type(obj)
endpoint = self.endpoint if self.endpoint is not None else request.endpoint
o = urlparse(url_for(endpoint, _external=self.absolute, **data))
if self.absolute:
scheme = self.scheme if self.scheme is not None else o.scheme
return urlunparse((scheme, o.netloc, o.path, "", "", ""))
return urlunparse(("", "", o.path, "", "", ""))
except TypeError as te:
raise MarshallingError(te)
开发者ID:bedge,项目名称:flask-restplus,代码行数:11,代码来源:fields.py
示例6: clean_ows_url
def clean_ows_url(url):
"""
clean an OWS URL of basic service elements
source: https://stackoverflow.com/a/11640565
"""
if url is None or not url.startswith('http'):
return url
filtered_kvp = {}
basic_service_elements = ('service', 'version', 'request')
parsed = urlparse(url)
qd = parse_qs(parsed.query, keep_blank_values=True)
for key, value in qd.items():
if key.lower() not in basic_service_elements:
filtered_kvp[key] = value
newurl = urlunparse([
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
urlencode(filtered_kvp, doseq=True),
parsed.fragment
])
return newurl
开发者ID:meteogrid,项目名称:OWSLib,代码行数:30,代码来源:util.py
示例7: methodNext
def methodNext(self, previous_request, previous_response):
"""Retrieves the next page of results.
Args:
previous_request: The request for the previous page. (required)
previous_response: The response from the request for the previous page. (required)
Returns:
A request object that you can call 'execute()' on to request the next
page. Returns None if there are no more items in the collection.
"""
# Retrieve nextPageToken from previous_response
# Use as pageToken in previous_request to create new request.
if "nextPageToken" not in previous_response or not previous_response["nextPageToken"]:
return None
request = copy.copy(previous_request)
pageToken = previous_response["nextPageToken"]
parsed = list(urlparse(request.uri))
q = parse_qsl(parsed[4])
# Find and remove old 'pageToken' value from URI
newq = [(key, value) for (key, value) in q if key != "pageToken"]
newq.append(("pageToken", pageToken))
parsed[4] = urlencode(newq)
uri = urlunparse(parsed)
request.uri = uri
logger.info("URL being requested: {0!s} {1!s}".format(methodName, uri))
return request
开发者ID:runt18,项目名称:google-api-python-client,代码行数:34,代码来源:discovery.py
示例8: update_content
def update_content( article, content, siteurl ):
self = article
stripped_content = content.strip()
if not stripped_content.startswith( PREFIX ):
return content
# remove {filename}
url = stripped_content[ len( PREFIX ): ]
parse_result = urlparse( url )
path = parse_result.path
if path.startswith( '/' ):
path = path[ 1: ]
else:
# relative link
path = self.get_relative_source_path( os.path.join( self.relative_dir, path ) )
# unescape spaces if necessary
if path not in self._context[ 'filenames' ]:
path = path.replace( '%20', ' ' )
if path not in self._context[ 'filenames' ]:
logger.warning(
"Unable to find `%s`, skipping url replacement.", url,
extra = { 'limit_msg': ( "Other resources were not found and their urls not replaced" ) }
)
return content
linked_content = self._context[ 'filenames' ][ path ]
parts = list( parse_result )
parts[ 2 ] = '/'.join( [ siteurl, linked_content.url ] )
return urlunparse( parts )
开发者ID:mrwonko,项目名称:homepage,代码行数:34,代码来源:metadata_links.py
示例9: strip_url
def strip_url(url, strip_credentials=True, strip_default_port=True, origin_only=False, strip_fragment=True):
"""Strip URL string from some of its components:
- ``strip_credentials`` removes "user:[email protected]"
- ``strip_default_port`` removes ":80" (resp. ":443", ":21")
from http:// (resp. https://, ftp://) URLs
- ``origin_only`` replaces path component with "/", also dropping
query and fragment components ; it also strips credentials
- ``strip_fragment`` drops any #fragment component
"""
parsed_url = urlparse(url)
netloc = parsed_url.netloc
if (strip_credentials or origin_only) and (parsed_url.username or parsed_url.password):
netloc = netloc.split('@')[-1]
if strip_default_port and parsed_url.port:
if (parsed_url.scheme, parsed_url.port) in (('http', 80),
('https', 443),
('ftp', 21)):
netloc = netloc.replace(':{p.port}'.format(p=parsed_url), '')
return urlunparse((
parsed_url.scheme,
netloc,
'/' if origin_only else parsed_url.path,
'' if origin_only else parsed_url.params,
'' if origin_only else parsed_url.query,
'' if strip_fragment else parsed_url.fragment
))
开发者ID:elacuesta,项目名称:scrapy,代码行数:29,代码来源:url.py
示例10: url_add_parameters
def url_add_parameters(url, params):
"""Adds parameters to URL, parameter will be repeated if already present"""
if params:
fragments = list(urlparse(url))
fragments[4] = urlencode(parse_qsl(fragments[4]) + params.items())
url = urlunparse(fragments)
return url
开发者ID:ForkRepo,项目名称:sentry,代码行数:7,代码来源:utils.py
示例11: _parse_contents
def _parse_contents(self, response):
# Wix pages aren't really parseable, so anytime we see them,
# let's re-run it (depth-1) with an escaped-fragment to get the real html source
if 'https://static.wixstatic.com/' in response.body and '_escaped_fragment_' not in response.url:
parsed_url = urlparse(response.url)
qs = parse_qs(parsed_url.query)
qs['_escaped_fragment_'] = ''
wix_scrapeable_url = urlunparse(
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, urlencode(qs), parsed_url.fragment)
)
response.meta['depth'] -= 1
return [scrapy.Request(wix_scrapeable_url, self.parse)]
return
if not hasattr(response, 'selector'):
logging.info('Skipping unknown file from: %s', response.url)
return
# Get all text contents of tags (unless they are script or style tags)
text_contents = ' '.join(response.selector.xpath('//*[not(self::script|self::style)]/text()').extract()).lower()
processed_text = grammar_matcher.StringProcessor(text_contents, regex_keywords.WORD_BOUNDARIES)
wrong = processed_text.get_tokens(all_styles.DANCE_WRONG_STYLE)
good = processed_text.get_tokens(rules.STREET_STYLE)
if (wrong or good):
#print response.url, set(wrong), set(good)
pass
开发者ID:mikelambert,项目名称:dancedeets-monorepo,代码行数:26,代码来源:studios.py
示例12: canonicalize_url
def canonicalize_url(url, keep_blank_values=True, keep_fragments=False,
encoding=None):
"""Canonicalize the given url by applying the following procedures:
- sort query arguments, first by key, then by value
- percent encode paths and query arguments. non-ASCII characters are
percent-encoded using UTF-8 (RFC-3986)
- normalize all spaces (in query arguments) '+' (plus symbol)
- normalize percent encodings case (%2f -> %2F)
- remove query arguments with blank values (unless keep_blank_values is True)
- remove fragments (unless keep_fragments is True)
The url passed can be a str or unicode, while the url returned is always a
str.
For examples see the tests in tests/test_utils_url.py
"""
scheme, netloc, path, params, query, fragment = parse_url(url)
keyvals = parse_qsl(query, keep_blank_values)
keyvals.sort()
query = urlencode(keyvals)
# XXX: copied from w3lib.url.safe_url_string to add encoding argument
# path = to_native_str(path, encoding)
# path = moves.urllib.parse.quote(path, _safe_chars, encoding='latin1') or '/'
path = safe_url_string(_unquotepath(path)) or '/'
fragment = '' if not keep_fragments else fragment
return urlunparse((scheme, netloc.lower(), path, params, query, fragment))
开发者ID:AugustLONG,项目名称:scrapy,代码行数:30,代码来源:url.py
示例13: get
def get(self, request, *args, **kwargs):
validate_session_for_mturk(request, self.session)
mturk_settings = self.session.config['mturk_hit_settings']
initial = {
'title': mturk_settings['title'],
'description': mturk_settings['description'],
'keywords': ', '.join(mturk_settings['keywords']),
'money_reward': self.session.config['participation_fee'],
'in_sandbox': settings.DEBUG,
'minutes_allotted_per_assignment': (
mturk_settings['minutes_allotted_per_assignment']
),
'expiration_hours': mturk_settings['expiration_hours'],
'assignments': self.session.mturk_num_participants,
}
form = self.get_form(initial=initial)
context = self.get_context_data(form=form)
context['mturk_enabled'] = (
bool(settings.AWS_ACCESS_KEY_ID) and
bool(settings.AWS_SECRET_ACCESS_KEY)
)
context['runserver'] = 'runserver' in sys.argv
url = self.request.build_absolute_uri(
reverse('MTurkCreateHIT', args=(self.session.code,))
)
secured_url = urlunparse(urlparse(url)._replace(scheme='https'))
context['secured_url'] = secured_url
return self.render_to_response(context)
开发者ID:goakichang,项目名称:otree-core,代码行数:29,代码来源:mturk.py
示例14: __init__
def __init__(self, info_yaml, basedir=''):
"""Import the solution's info.yaml file."""
f, url_parts = self._open(info_yaml, basedir)
solution_yaml = f.read().decode('utf-8')
self.basedir = urlunparse((url_parts.scheme, url_parts.netloc,
os.path.dirname(url_parts.path),
None, None, None))
# create a markdown converter and modify it to rebase image links
markdown = Markdown()
markdown.inlinePatterns['image_link'] = _RebasedImageLinkPattern(
self.basedir, IMAGE_LINK_RE, markdown)
markdown.inlinePatterns['image_reference'] = _RebasedImageRefPattern(
self.basedir, IMAGE_REFERENCE_RE, markdown)
# import the solution's metadata
info = yaml.load(solution_yaml)
self.id = hashlib.md5(solution_yaml.encode('utf-8')).hexdigest()
self.title = info['name']
self.release = str(info['release'])
if 'logo' in info:
self.logo = self._make_absolute_path(info.get('logo'),
self.basedir)[0]
# in all the following fields, newlines are suppressed because they
# are not rendered properly in Javascript strings by Django
self.short_description = \
markdown.convert(info['short_desc']).replace('\n', '')
self.long_description = \
markdown.convert(info['long_desc']).replace('\n', '')
self.architecture = \
markdown.convert(info['architecture']).replace('\n', '')
self.design_specs = info.get('design_specs', [])
self.heat_template = info['heat_template']
self.env_file = info.get('env_file') # environments are optional
开发者ID:darrenchan,项目名称:rpc-openstack,代码行数:34,代码来源:solution.py
示例15: build_url
def build_url(base, additional_params=None):
"""Construct a URL based off of base containing all parameters in
the query portion of base plus any additional parameters.
:param base: Base URL
:type base: str
::param additional_params: Additional query parameters to include.
:type additional_params: dict
:rtype: str
"""
url = urlparse(base)
query_params = {}
query_params.update(parse_qsl(url.query, True))
if additional_params is not None:
query_params.update(additional_params)
for k, v in six.iteritems(additional_params):
if v is None:
query_params.pop(k)
return urlunparse((url.scheme,
url.netloc,
url.path,
url.params,
urlencode(query_params),
url.fragment))
开发者ID:Mognom,项目名称:wirecloud,代码行数:25,代码来源:pyoauth2_utils.py
示例16: get_redirect_url
def get_redirect_url(self, state="", redirect_uri=None, scope=None,
action=None, email=None, client_id=None,
code_challenge=None, code_challenge_method=None,
access_type=None, keys_jwk=None):
"""Get the URL to redirect to to initiate the oauth flow."""
if client_id is None:
client_id = self.client_id
params = {
"client_id": client_id,
"state": state,
}
if redirect_uri is not None:
params["redirect_uri"] = redirect_uri
if scope is not None:
params["scope"] = scope
if action is not None:
params["action"] = action
if email is not None:
params["email"] = email
if code_challenge is not None:
params["code_challenge"] = code_challenge
if code_challenge_method is not None:
params["code_challenge_method"] = code_challenge_method
if keys_jwk is not None:
params["keys_jwk"] = keys_jwk
if access_type is not None:
params["access_type"] = access_type
query_str = urlencode(params)
authorization_url = urlparse(self.server_url + "/authorization")
return urlunparse(authorization_url._replace(query=query_str))
开发者ID:mozilla,项目名称:PyFxA,代码行数:30,代码来源:oauth.py
示例17: check
def check(self, instance):
if not self.cadvisor_url:
cadvisor_url = instance.get("cadvisor_url", None)
detect_cadvisor_url = instance.get("kubernetes_detect_cadvisor", False)
if not cadvisor_url:
if detect_cadvisor_url:
kubernetes_connector = utils.KubernetesConnector(self.connection_timeout)
host = kubernetes_connector.get_agent_pod_host()
cadvisor_url = "http://{}:4194".format(host)
else:
exception_message = "Either cAdvisor url or kubernetes " \
"detect cAdvisor must be set when " \
"monitoring a Kubernetes Node."
self.log.error(exception_message)
raise Exception(exception_message)
self.cadvisor_url = "{}/{}".format(cadvisor_url, "api/v2.0/stats?count=1")
dimensions = self._set_dimensions(None, instance)
try:
host_metrics = requests.get(self.cadvisor_url, self.connection_timeout).json()
except Exception as e:
self.log.error("Error communicating with cAdvisor to collect data - {}".format(e))
else:
# Retrieve machine info only once
if not self.cadvisor_machine_url:
# Replace path in current cadvisor_url
result = urlparse(self.cadvisor_url)
self.cadvisor_machine_url = urlunparse(result._replace(path="api/v2.0/machine"))
try:
machine_info = requests.get(self.cadvisor_machine_url).json()
except Exception as ex:
self.log.error(
"Error communicating with cAdvisor to collect machine data - {}".format(ex))
else:
self._parse_machine_info(machine_info)
self._parse_send_metrics(host_metrics, dimensions)
开发者ID:openstack,项目名称:monasca-agent,代码行数:35,代码来源:cadvisor_host.py
示例18: _parseURL
def _parseURL(url):
try:
url = urinorm.urinorm(url)
except ValueError:
return None
proto, netloc, path, params, query, frag = urlparse(url)
if not path:
# Python <2.4 does not parse URLs with no path properly
if not query and '?' in netloc:
netloc, query = netloc.split('?', 1)
path = '/'
path = urlunparse(('', '', path, params, query, frag))
if ':' in netloc:
try:
host, port = netloc.split(':')
except ValueError:
return None
if not re.match(r'\d+$', port):
return None
else:
host = netloc
port = ''
host = host.lower()
if not host_segment_re.match(host):
return None
return proto, host, port, path
开发者ID:bjinwright,项目名称:python3-openid,代码行数:32,代码来源:trustroot.py
示例19: replace_query_parameters
def replace_query_parameters(request, replacements):
"""
Replace query parameters in request according to replacements. The
replacements should be a list of (key, value) pairs where the value can be
any of:
1. A simple replacement string value.
2. None to remove the given header.
3. A callable which accepts (key, value, request) and returns a string
value or None.
"""
query = request.query
new_query = []
replacements = dict(replacements)
for k, ov in query:
if k not in replacements:
new_query.append((k, ov))
else:
rv = replacements[k]
if callable(rv):
rv = rv(key=k, value=ov, request=request)
if rv is not None:
new_query.append((k, rv))
uri_parts = list(urlparse(request.uri))
uri_parts[4] = urlencode(new_query)
request.uri = urlunparse(uri_parts)
return request
开发者ID:adamchainz,项目名称:vcrpy,代码行数:26,代码来源:filters.py
示例20: get_sample_data
def get_sample_data(self, meter_name, parse_url, params, cache):
extractor = self._get_extractor(meter_name)
if extractor is None:
# The way to getting meter is not implemented in this driver or
# OpenDaylight REST API has not api to getting meter.
return None
iter = self._get_iter(meter_name)
if iter is None:
# The way to getting meter is not implemented in this driver or
# OpenDaylight REST API has not api to getting meter.
return None
parts = urlparse.ParseResult(
params.get("scheme", ["http"])[0], parse_url.netloc, parse_url.path, None, None, None
)
endpoint = urlparse.urlunparse(parts)
data = self._prepare_cache(endpoint, params, cache)
samples = []
for name, value in six.iteritems(data):
timestamp = value["timestamp"]
for sample in iter(extractor, value):
if sample is not None:
# set controller name and container name
# to resource_metadata
sample[2]["controller"] = "OpenDaylight"
sample[2]["container"] = name
samples.append(sample + (timestamp,))
return samples
开发者ID:m1093782566,项目名称:ceilometer,代码行数:34,代码来源:driver.py
注:本文中的six.moves.urllib.parse.urlunparse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论