本文整理汇总了Python中six.moves.urllib_parse.urljoin函数的典型用法代码示例。如果您正苦于以下问题:Python urljoin函数的具体用法?Python urljoin怎么用?Python urljoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urljoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: call_recommendation_server
def call_recommendation_server(server, client_id_or_guid, data, verb='get'):
"""Call taar `server` to get recommendations for a given
`client_id_or_guid`.
`data` is a dict containing either query parameters to be passed in the URL
if we're calling the server through GET, or the data we'll pass through
POST as json.
The HTTP verb to use is either "get" or "post", controlled through `verb`,
which defaults to "get"."""
request_kwargs = {
'timeout': settings.RECOMMENDATION_ENGINE_TIMEOUT
}
if verb == 'get':
params = OrderedDict(sorted(data.items(), key=lambda t: t[0]))
endpoint = urljoin(server, '%s/%s%s' % (
client_id_or_guid, '?' if params else '', urlencode(params)))
else:
endpoint = urljoin(server, '%s/' % client_id_or_guid)
request_kwargs['json'] = data
log.debug(u'Calling recommendation server: {0}'.format(endpoint))
try:
with statsd.timer('services.recommendations'):
response = getattr(requests, verb)(endpoint, **request_kwargs)
if response.status_code != 200:
raise requests.exceptions.RequestException()
except requests.exceptions.RequestException as e:
log.error(u'Calling recommendation engine failed: {0}'.format(e))
statsd.incr('services.recommendations.fail')
return None
else:
statsd.incr('services.recommendations.success')
return json.loads(response.content).get('results', None)
开发者ID:diox,项目名称:olympia,代码行数:33,代码来源:utils.py
示例2: send_confirm_notification
def send_confirm_notification(self, queue, subscription, conf,
project=None, expires=None,
api_version=None, is_unsubscribed=False):
# NOTE(flwang): If the confirmation feature isn't enabled, just do
# nothing. Here we're getting the require_confirmation from conf
# object instead of using self.require_confirmation, because the
# variable from self object really depends on the kwargs when
# initializing the NotifierDriver object. See bug 1655812 for more
# information.
if not conf.notification.require_confirmation:
return
key = conf.signed_url.secret_key
if not key:
LOG.error("Can't send confirm notification due to the value of"
" secret_key option is None")
return
url = "/%s/queues/%s/subscriptions/%s/confirm" % (api_version, queue,
subscription['id'])
pre_url = urls.create_signed_url(key, [url], project=project,
expires=expires, methods=['PUT'])
message = None
if is_unsubscribed:
message_type = MessageType.UnsubscribeConfirmation.name
message = ('You have unsubscribed successfully to the queue: %s, '
'you can resubscribe it by using confirmed=True.'
% queue)
else:
message_type = MessageType.SubscriptionConfirmation.name
message = 'You have chosen to subscribe to the queue: %s' % queue
messages = {}
endpoint_dict = auth.get_public_endpoint()
if endpoint_dict:
wsgi_endpoint = endpoint_dict.get('zaqar')
if wsgi_endpoint:
wsgi_subscribe_url = urllib_parse.urljoin(
wsgi_endpoint, url)
messages['WSGISubscribeURL'] = wsgi_subscribe_url
websocket_endpoint = endpoint_dict.get('zaqar-websocket')
if websocket_endpoint:
websocket_subscribe_url = urllib_parse.urljoin(
websocket_endpoint, url)
messages['WebSocketSubscribeURL'] = websocket_subscribe_url
messages.update({'Message_Type': message_type,
'Message': message,
'URL-Signature': pre_url['signature'],
'URL-Methods': pre_url['methods'][0],
'URL-Paths': pre_url['paths'][0],
'X-Project-ID': pre_url['project'],
'URL-Expires': pre_url['expires'],
'SubscribeBody': {'confirmed': True},
'UnsubscribeBody': {'confirmed': False}})
s_type = urllib_parse.urlparse(subscription['subscriber']).scheme
LOG.info('Begin to send %(type)s confirm/unsubscribe notification.'
' The request body is %(messages)s',
{'type': s_type, 'messages': messages})
self._execute(s_type, subscription, [messages], conf)
开发者ID:openstack,项目名称:zaqar,代码行数:59,代码来源:notifier.py
示例3: download_get_basefiles
def download_get_basefiles(self, url):
done = False
pagecnt = 1
# existing_cnt = 0
while not done:
self.log.info('Result page #%s (%s)' % (pagecnt, url))
resp = requests.get(url)
mainsoup = BeautifulSoup(resp.text)
for link in mainsoup.find_all(href=re.compile("/sb/d/108/a/")):
desc = link.find_next_sibling("span", "info").get_text(strip=True)
tmpurl = urljoin(url, link['href'])
# use a strict regex first, then a more forgiving
m = self.re_basefile_strict.search(desc)
if not m:
m = self.re_basefile_lax.search(desc)
if not m:
self.log.warning(
"Can't find Document ID from %s, forced to download doc page" % desc)
resp = requests.get(tmpurl)
subsoup = BeautifulSoup(resp.text)
for a in subsoup.find("div", "doc").find_all("li", "pdf"):
text = a.get_text(strip=True)
m = self.re_basefile_lax.search(text)
if m:
break
else:
self.log.error("Cannot possibly find docid for %s" % tmpurl)
continue
else:
self.log.warning(
"%s (%s) not using preferred form: '%s'" % (m.group(1), tmpurl, m.group(0)))
basefile = m.group(1)
# Extra checking -- sometimes ids like 2003/2004:45
# are used (should be 2003/04:45)
if (":" in basefile and "/" in basefile):
(y1, y2, o) = re.split("[:/]", basefile)
# 1999/2000:45 is a special case
if len(y2) == 4 and y1 != "1999":
self.log.warning(
"%s (%s) using incorrect year format, should be '%s/%s:%s'" %
(basefile, tmpurl, y1, y2[2:], o))
basefile = "%s/%s:%s" % (y1, y2[2:], o)
yield basefile, urljoin(url, link['href'])
pagecnt += 1
next = mainsoup.find("a", text="Nästa sida")
if next:
url = urljoin(url, next['href'])
else:
done = True
开发者ID:h4ck3rm1k3,项目名称:ferenda,代码行数:54,代码来源:regeringen.py
示例4: download_from_atom
def download_from_atom(self):
refresh = self.config.force
feed_url = self.start_url
ns = 'http://www.w3.org/2005/Atom'
done = False
biggraph = Graph()
biggraph.bind("dct", self.ns['dct'])
biggraph.bind("rpubl", self.ns['rpubl'])
while not done:
self.log.info("Feed: %s" % feed_url)
tree = etree.parse(requests.get(feed_url).text)
for entry in tree.findall('{%s}entry' % (ns)):
try:
self.log.info(" Examining entry")
rdf_url = None
for node in entry:
if (node.tag == "{%s}link" % ns and
node.get('type') == 'application/rdf+xml'):
rdf_url = urljoin(feed_url, node.get("href"))
elif (node.tag == "{%s}content" % ns and
node.get('type') == 'application/rdf+xml'):
rdf_url = urljoin(feed_url, node.get("src"))
if rdf_url:
self.log.info(" RDF: %s" % rdf_url)
g = Graph()
g.parse(requests.get(rdf_url).text)
for triple in g:
s, p, o = triple
if (not isinstance(o, URIRef) or
not str(o).startswith(self.config.url)):
g.remove(triple)
self.log.debug(" Adding %s triples" % len(g))
biggraph += g
except KeyboardInterrupt:
raise
except:
e = sys.exc_info()[1]
self.log.error("ERROR: %s" % e)
done = True
for link in list(tree.findall('{%s}link' % (ns))):
self.log.info(" Examining link")
if link.get('rel') == 'prev-archive':
feed_url = urljoin(feed_url, link.get("href"))
done = False
# done = True
self.log.info("Done downloading")
with self.store.open_downloaded("biggraph", "wb") as fp:
fp.write(biggraph.serialize(format="nt"))
开发者ID:h4ck3rm1k3,项目名称:ferenda,代码行数:53,代码来源:skeleton.py
示例5: download_resource
def download_resource(self, resource_location, location):
"""Download the resource in the specified location
:param resource_script:
Is relative to the /argus/resources/ directory.
:param location:
The location on the instance.
"""
base_resource = CONFIG.argus.resources
if not base_resource.endswith("/"):
base_resource = urlparse.urljoin(CONFIG.argus.resources,
"resources/")
uri = urlparse.urljoin(base_resource, resource_location)
self.download(uri, location)
开发者ID:micumatei,项目名称:cloudbase-init-ci,代码行数:14,代码来源:windows.py
示例6: verify_proxy_ticket
def verify_proxy_ticket(ticket, service):
"""Verifies CAS 2.0+ XML-based proxy ticket.
Returns username on success and None on failure.
"""
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
params = {'ticket': ticket, 'service': service}
url = (urljoin(settings.CAS_SERVER_URL, 'proxyValidate') + '?' +
urlencode(params))
page = urlopen(url)
try:
response = page.read()
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('authenticationSuccess'):
username = tree[0][0].text
proxies = []
if len(tree[0]) > 1:
for element in tree[0][1]:
proxies.append(element.text)
return {"username": username, "proxies": proxies}, None
else:
return None, None
finally:
page.close()
开发者ID:UGentPortaal,项目名称:django-cas,代码行数:32,代码来源:backends.py
示例7: get_api_data
def get_api_data(self, target_url=None, query_params=None):
"""retrieves the Jenkins API specific data from the specified URL
:param str target_url:
Full URL to the REST API endpoint to be queried. If not provided,
data will be loaded from the default 'url' for this object
:param str query_params:
optional set of query parameters to customize the returned data
:returns:
The set of Jenkins attributes, converted to Python objects,
associated with the given URL.
:rtype: :class:`dict`
"""
if target_url is None:
target_url = self.url
temp_url = urllib_parse.urljoin(target_url, "api/json")
if query_params is not None:
# TODO: Update this to pass 'params' key to get method
temp_url += "?" + query_params
req = requests.get(
temp_url,
auth=self._creds,
verify=self._ssl_cert)
req.raise_for_status()
retval = req.json()
self._log.debug(json.dumps(retval, indent=4))
return retval
开发者ID:TheFriendlyCoder,项目名称:pyjen,代码行数:30,代码来源:jenkins_api.py
示例8: make_fasta_dna_url
def make_fasta_dna_url(
ensembl_release,
species,
contig,
server=ENSEMBL_FTP_SERVER):
"""
Construct URL to FASTA file with full sequence of a particular chromosome.
Returns server_url/subdir and filename as tuple result.
"""
ensembl_release, species, reference_name = _normalize_release_properties(
ensembl_release, species)
subdir = _species_subdir(
ensembl_release,
species=species,
filetype="fasta",
server=server,)
server_subdir = urllib_parse.urljoin(server, subdir)
server_sequence_subdir = join(server_subdir, "dna")
filename = FASTA_DNA_CHROMOSOME_FILENAME_TEMPLATE % {
"Species": species.capitalize(),
"reference": reference_name,
"release": ensembl_release,
"sequence_type": "dna",
"contig": contig
}
return join(server_sequence_subdir, filename)
开发者ID:BioInfoTools,项目名称:pyensembl,代码行数:27,代码来源:ensembl_url_templates.py
示例9: _ComputePaths
def _ComputePaths(package, version, discovery_doc):
full_path = urllib_parse.urljoin(discovery_doc["rootUrl"], discovery_doc["servicePath"])
api_path_component = "/".join((package, version, ""))
if api_path_component not in full_path:
return full_path, ""
prefix, _, suffix = full_path.rpartition(api_path_component)
return prefix + api_path_component, suffix
开发者ID:rlugojr,项目名称:apitools,代码行数:7,代码来源:util.py
示例10: _register_services
def _register_services(self):
""" Check for any unregistered services and register them
Also check for changed services and update them
"""
base_url = self.bleemeo_base_url
registration_url = urllib_parse.urljoin(base_url, '/v1/service/')
for key, service_info in self.core.services.items():
(service_name, instance) = key
entry = {
'listen_addresses':
get_listen_addresses(service_info),
'label': service_name,
'exe_path': service_info.get('exe_path', ''),
}
if instance is not None:
entry['instance'] = instance
if key in self.services_uuid:
entry['uuid'] = self.services_uuid[key]['uuid']
# check for possible update
if self.services_uuid[key] == entry:
continue
method = requests.put
service_uuid = self.services_uuid[key]['uuid']
url = registration_url + str(service_uuid) + '/'
expected_code = 200
else:
method = requests.post
url = registration_url
expected_code = 201
payload = entry.copy()
payload.update({
'account': self.account_id,
'agent': self.agent_uuid,
})
response = method(
url,
data=json.dumps(payload),
auth=(self.agent_username, self.agent_password),
headers={
'X-Requested-With': 'XMLHttpRequest',
'Content-type': 'application/json',
},
)
if response.status_code != expected_code:
logging.debug(
'Service registration failed. Server response = %s',
response.content
)
continue
entry['uuid'] = response.json()['id']
self.services_uuid[key] = entry
self.core.state.set_complex_dict(
'services_uuid', self.services_uuid
)
开发者ID:bleemeo,项目名称:bleemeo-agent,代码行数:60,代码来源:bleemeo.py
示例11: notify_new_email
def notify_new_email(email, user):
""" Ask the user to confirm to the email belong to them.
"""
root_url = pagure_config.get("APP_URL", flask.request.url_root)
url = urljoin(
root_url or flask.request.url_root,
flask.url_for("ui_ns.confirm_email", token=email.token),
)
text = """Dear %(username)s,
You have registered a new email on pagure at %(root_url)s.
To finish your validate this registration, please click on the following
link or copy/paste it in your browser, this link will remain valid only 2 days:
%(url)s
The email will not be activated until you finish this step.
Sincerely,
Your pagure admin.
""" % (
{"username": user.username, "url": url, "root_url": root_url}
)
send_email(
text,
"Confirm new email",
email.email,
user_from=user.fullname or user.user,
)
开发者ID:pypingou,项目名称:pagure,代码行数:33,代码来源:notify.py
示例12: build_uri
def build_uri(self, base, matches):
if not base:
return None
if self.uriTemplate:
expanded = str(self.uriTemplate)
elif self.fragmentTemplate:
if "#" in base:
base += self.space.fragmentSeparator
else:
base += "#"
expanded = base + str(self.fragmentTemplate)
else:
return None
expanded = expanded.replace("{+base}", base)
for var, value in matches.items():
slug = self.transform_value(value)
expanded = expanded.replace("{%s}" % var, slug)
# if base is eg "http://localhost/res/" and expanded is a
# /-prefixed relative uri like "/sfs/9999:998", urljoin
# results in "http://localhost/sfs/9999:998/", not
# "http://localhost/res/" like you'd expect. So we work
# around.
if expanded[0] == "/":
expanded = expanded[1:]
if expanded.startswith("http://") or expanded.startswith("https://"):
return urljoin(base, expanded)
else:
# see the test integrationLegalURI.CustomCoinstruct.test_1845_50_s.1
return "%s/%s" % (base, expanded)
开发者ID:staffanm,项目名称:ferenda,代码行数:31,代码来源:coin.py
示例13: _generate_client_conf
def _generate_client_conf():
auth_strategy = os.environ.get('OS_AUTH_STRATEGY', 'noauth')
if auth_strategy == 'keystone':
args = _get_credential_args()
# FIXME(flwang): Now we're hardcode the keystone auth version, since
# there is a 'bug' with the osc-config which is returning the auth_url
# without version. This should be fixed as long as the bug is fixed.
parsed_url = urllib_parse.urlparse(args['auth_url'])
auth_url = args['auth_url']
if not parsed_url.path or parsed_url.path == '/':
auth_url = urllib_parse.urljoin(args['auth_url'], 'v2.0')
conf = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_username': args['username'],
'os_password': args['password'],
'os_project_name': args['project_name'],
'os_auth_url': auth_url,
'insecure': '',
},
},
}
else:
conf = {
'auth_opts': {
'backend': 'noauth',
'options': {
'os_project_id': 'my-lovely-benchmark',
},
},
}
print("Using '{0}' authentication method".format(conf['auth_opts']
['backend']))
return conf
开发者ID:AvnishPal,项目名称:zaqar,代码行数:35,代码来源:helpers.py
示例14: remove_user_from_uri
def remove_user_from_uri(apps, schema_editor):
Formula = apps.get_model('formulas', 'Formula')
for formula in Formula.objects.all():
url_bits = urlsplit(formula.uri)
# don't do it if it's an ssh formula
if 'ssh' in url_bits.scheme:
continue
if url_bits.username:
formula.git_username = url_bits.username
if url_bits.port:
new_netloc = '{}:{}'.format(url_bits.hostname, url_bits.port)
else:
new_netloc = url_bits.hostname
formula.uri = urljoin((
url_bits.scheme,
new_netloc,
url_bits.path,
url_bits.query,
url_bits.fragment,
))
formula.save()
开发者ID:clarkperkins,项目名称:stackdio,代码行数:27,代码来源:0002_0_8_0_migrations.py
示例15: paste
def paste(self, s):
"""Upload to pastebin via json interface."""
url = urljoin(self.url, '/json/new')
payload = {
'code': s,
'lexer': 'pycon',
'expiry': self.expiry
}
try:
response = requests.post(url, data=payload, verify=True)
response.raise_for_status()
except requests.exceptions.RequestException as exc:
raise PasteFailed(exc.message)
data = response.json()
paste_url_template = Template(self.show_url)
paste_id = urlquote(data['paste_id'])
paste_url = paste_url_template.safe_substitute(paste_id=paste_id)
removal_url_template = Template(self.removal_url)
removal_id = urlquote(data['removal_id'])
removal_url = removal_url_template.safe_substitute(
removal_id=removal_id)
return (paste_url, removal_url)
开发者ID:Caleb1994,项目名称:bpython,代码行数:28,代码来源:paste.py
示例16: get_file_urls
def get_file_urls(self, name):
query = (
select([
release_files.c.name,
release_files.c.filename,
release_files.c.python_version,
release_files.c.md5_digest,
])
.where(release_files.c.name == name)
.order_by(release_files.c.filename.desc())
)
with self.engine.connect() as conn:
results = conn.execute(query)
return [
FileURL(
filename=r["filename"],
url=urllib_parse.urljoin(
"/".join([
"../../packages",
r["python_version"],
r["name"][0],
r["name"],
r["filename"],
]),
"#md5={}".format(r["md5_digest"]),
),
)
for r in results
]
开发者ID:domenkozar,项目名称:warehouse,代码行数:31,代码来源:models.py
示例17: urls_from_urlset_or_sitemapindex
def urls_from_urlset_or_sitemapindex(response):
""" Yields URLs from ``<urlset>`` or ``<sitemapindex>`` elements as per
`sitemaps.org <http://www.sitemaps.org/protocol.html>`_.
"""
sitemap = URL(response.url).fragment_dict.get('sitemap')
content_subtypes = response.headers.get_content_subtype().split('+')
if not sitemap and not 'xml' in content_subtypes:
return
root = None
for _, elem in iterparse(decode(response)):
if root is None:
root = elem.getroottree().getroot()
if not (root.tag.endswith('}sitemapindex') or
root.tag.endswith('}urlset')):
# root element has wrong tag - give up
break
if elem.tag.endswith('}loc') and elem.text is not None:
text = elem.text.strip()
if text:
# http://www.sitemaps.org/protocol.html#locdef
url = URL(urljoin(response.url, text))
if elem.getparent().tag.endswith('}sitemap'):
# set sitemap=True to help downstream processing
url = url.update_fragment_dict(sitemap=True)
yield "url", url
if elem.getparent() is root:
# release memory for previous elements
while elem.getprevious() is not None:
del root[0]
开发者ID:justinvanwinkle,项目名称:wextracto,代码行数:34,代码来源:sitemaps.py
示例18: _get_application
def _get_application(self, app_id):
url = urllib_parse.urljoin(self._api_url + 'applications/', str(app_id) + '.json')
resp = requests.get(url, headers=self._headers).json()
if 'application' in resp:
# pick 1st application
return resp['application'] if resp['application'] else None
return None
开发者ID:AlexeyDeyneko,项目名称:st2contrib,代码行数:7,代码来源:new_relic_app_sensor.py
示例19: rename
def rename(self, new_job_name):
"""Changes the name of this job
:param str new_job_name:
new name to assign to this job
"""
args = {
"params": {
"newName": new_job_name
}
}
self._api.post(self._api.url + "doRename", args=args)
# NOTE: In order to properly support jobs that may contain nested
# jobs we have to do some URL manipulations to extrapolate the
# REST API endpoint for the parent object to which the cloned
# view is to be contained.
parts = urllib_parse.urlsplit(self._api.url).path.split("/")
parts = [cur_part for cur_part in parts if cur_part.strip()]
assert len(parts) >= 2
assert parts[-2] == "job"
new_url = urllib_parse.urljoin(
self._api.url, "/" + "/".join(parts[:-2]))
new_url += "/job/" + new_job_name
self._api = self._api.clone(new_url)
assert self.name == new_job_name
开发者ID:TheFriendlyCoder,项目名称:pyjen,代码行数:27,代码来源:job.py
示例20: get_proxy_ticket_for
def get_proxy_ticket_for(self, service):
"""Verifies CAS 2.0+ XML-based authentication ticket.
Returns username on success and None on failure.
"""
if not settings.CAS_PROXY_CALLBACK:
raise CasConfigException("No proxy callback set in settings")
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
params = {'pgt': self.tgt, 'targetService': service}
url = (urljoin(settings.CAS_SERVER_URL, 'proxy') + '?' +
urlencode(params))
page = urlopen(url)
try:
response = page.read()
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('proxySuccess'):
return tree[0][0].text
else:
raise CasTicketException("Failed to get proxy ticket")
finally:
page.close()
开发者ID:rlmv,项目名称:django-dartmouth-cas,代码行数:29,代码来源:models.py
注:本文中的six.moves.urllib_parse.urljoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论