本文整理汇总了Python中six.moves.urllib.parse.urljoin函数的典型用法代码示例。如果您正苦于以下问题:Python urljoin函数的具体用法?Python urljoin怎么用?Python urljoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urljoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self):
self.verify_https = os.environ.get('OAUTHLIB_INSECURE_TRANSPORT', '') == ""
if self.verify_https and os.environ.get("REQUESTS_CA_BUNDLE", "").strip() != "":
self.verify_https = os.environ["REQUESTS_CA_BUNDLE"].strip()
self.jwt_enable = six.text_type(os.environ.get('CKAN_OAUTH2_JWT_ENABLE', toolkit.config.get('ckan.oauth2.jwt.enable',''))).strip().lower() in ("true", "1", "on")
self.legacy_idm = six.text_type(os.environ.get('CKAN_OAUTH2_LEGACY_IDM', toolkit.config.get('ckan.oauth2.legacy_idm', ''))).strip().lower() in ("true", "1", "on")
self.authorization_endpoint = six.text_type(os.environ.get('CKAN_OAUTH2_AUTHORIZATION_ENDPOINT', toolkit.config.get('ckan.oauth2.authorization_endpoint', ''))).strip()
self.token_endpoint = six.text_type(os.environ.get('CKAN_OAUTH2_TOKEN_ENDPOINT', toolkit.config.get('ckan.oauth2.token_endpoint', ''))).strip()
self.profile_api_url = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_URL', toolkit.config.get('ckan.oauth2.profile_api_url', ''))).strip()
self.client_id = six.text_type(os.environ.get('CKAN_OAUTH2_CLIENT_ID', toolkit.config.get('ckan.oauth2.client_id', ''))).strip()
self.client_secret = six.text_type(os.environ.get('CKAN_OAUTH2_CLIENT_SECRET', toolkit.config.get('ckan.oauth2.client_secret', ''))).strip()
self.scope = six.text_type(os.environ.get('CKAN_OAUTH2_SCOPE', toolkit.config.get('ckan.oauth2.scope', ''))).strip()
self.rememberer_name = six.text_type(os.environ.get('CKAN_OAUTH2_REMEMBER_NAME', toolkit.config.get('ckan.oauth2.rememberer_name', 'auth_tkt'))).strip()
self.profile_api_user_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_USER_FIELD', toolkit.config.get('ckan.oauth2.profile_api_user_field', ''))).strip()
self.profile_api_fullname_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_FULLNAME_FIELD', toolkit.config.get('ckan.oauth2.profile_api_fullname_field', ''))).strip()
self.profile_api_mail_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_MAIL_FIELD', toolkit.config.get('ckan.oauth2.profile_api_mail_field', ''))).strip()
self.profile_api_groupmembership_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_GROUPMEMBERSHIP_FIELD', toolkit.config.get('ckan.oauth2.profile_api_groupmembership_field', ''))).strip()
self.sysadmin_group_name = six.text_type(os.environ.get('CKAN_OAUTH2_SYSADMIN_GROUP_NAME', toolkit.config.get('ckan.oauth2.sysadmin_group_name', ''))).strip()
self.redirect_uri = urljoin(urljoin(toolkit.config.get('ckan.site_url', 'http://localhost:5000'), toolkit.config.get('ckan.root_path')), constants.REDIRECT_URL)
# Init db
db.init_db(model)
missing = [key for key in REQUIRED_CONF if getattr(self, key, "") == ""]
if missing:
raise ValueError("Missing required oauth2 conf: %s" % ", ".join(missing))
elif self.scope == "":
self.scope = None
开发者ID:conwetlab,项目名称:ckanext-oauth2,代码行数:32,代码来源:oauth2.py
示例2: startElementNS
def startElementNS(self, name, qname, attrs):
stack = self.stack
stack.append(ElementHandler())
current = self.current
parent = self.parent
base = attrs.get(BASE, None)
if base is not None:
base, frag = urldefrag(base)
if parent and parent.base:
base = urljoin(parent.base, base)
else:
systemId = self.locator.getPublicId() \
or self.locator.getSystemId()
if systemId:
base = urljoin(systemId, base)
else:
if parent:
base = parent.base
if base is None:
systemId = self.locator.getPublicId() \
or self.locator.getSystemId()
if systemId:
base, frag = urldefrag(systemId)
current.base = base
language = attrs.get(LANG, None)
if language is None:
if parent:
language = parent.language
current.language = language
current.start(name, qname, attrs)
开发者ID:drewp,项目名称:rdflib,代码行数:30,代码来源:rdfxml.py
示例3: process_response
def process_response(self, request, response, spider):
if request.meta.get('dont_redirect', False):
return response
if request.method == 'HEAD':
if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
return self._redirect(redirected, request, spider, response.status)
else:
return response
referer = request.url
if response.status in [302, 303] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = self._redirect_request_using_get(request, redirected_url)
redirected = self._redirect(redirected, request, spider, response.status)
redirected.headers['Referer'] = referer
return redirected
referer = request.url
if response.status in [301, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
redirected = self._redirect(redirected, request, spider, response.status)
redirected.headers['Referer'] = referer
return redirected
return response
开发者ID:ICCV,项目名称:chaos,代码行数:31,代码来源:__init__.py
示例4: open_in_browser
def open_in_browser(config_obj, jql_query, query_log_path, new_tab=False):
"""
Open browser in JIRA with the retrieved keys from Stash as url-params
:param config_obj:
:param jql_query:
:return:
"""
jira_url = config_obj.jira_url
if len(jql_query) < OPEN_IN_BROWSER_BELOW:
params = {
'jql': jql_query
}
b_url = urljoin(urljoin(jira_url, 'issues/'), '?' + urlencode(params))
if new_tab:
webbrowser.open(b_url, new=2)
else:
webbrowser.open(b_url, new=0)
else:
click.echo("Too much data to open in browser.")
click.echo("Query saved to " + query_log_path)
with open(query_log_path, 'a') as f:
f.write(jql_query + "\n\n")
开发者ID:enrico2828,项目名称:Stash2Jira,代码行数:25,代码来源:cli.py
示例5: _send_batch
def _send_batch(self, destination, events):
''' Makes a single batch API request with the given list of events. The
`destination` argument contains the write key, API host and dataset
name used to build the request.'''
start = time.time()
status_code = 0
try:
url = urljoin(urljoin(destination.api_host, "/1/batch/"),
destination.dataset)
payload = []
for ev in events:
event_time = ev.created_at.isoformat()
if ev.created_at.tzinfo is None:
event_time += "Z"
payload.append({
"time": event_time,
"samplerate": ev.sample_rate,
"data": ev.fields()})
self.log("firing batch, size = %d", len(payload))
resp = self.session.post(
url,
headers={"X-Honeycomb-Team": destination.writekey, "Content-Type": "application/json"},
data=json.dumps(payload, default=json_default_handler),
timeout=10.0,
)
status_code = resp.status_code
resp.raise_for_status()
statuses = [{"status": d.get("status"), "error": d.get("error")} for d in resp.json()]
for ev, status in zip(events, statuses):
self._enqueue_response(status.get("status"), "", status.get("error"), start, ev.metadata)
except Exception as e:
# Catch all exceptions and hand them to the responses queue.
self._enqueue_errors(status_code, e, start, events)
开发者ID:honeycombio,项目名称:libhoney-py,代码行数:35,代码来源:transmission.py
示例6: buildDiscover
def buildDiscover(base_url, out_dir):
"""Convert all files in a directory to apache mod_asis files in
another directory."""
test_data = discoverdata.readTests(discoverdata.default_test_file)
def writeTestFile(test_name):
template = test_data[test_name]
data = discoverdata.fillTemplate(
test_name, template, base_url, discoverdata.example_xrds)
out_file_name = os.path.join(out_dir, test_name)
out_file = open(out_file_name, 'w')
out_file.write(data)
manifest = [manifest_header]
for success, input_name, id_name, result_name in discoverdata.testlist:
if not success:
continue
writeTestFile(input_name)
input_url = urljoin(base_url, input_name)
id_url = urljoin(base_url, id_name)
result_url = urljoin(base_url, result_name)
manifest.append('\t'.join((input_url, id_url, result_url)))
manifest.append('\n')
manifest_file_name = os.path.join(out_dir, 'manifest.txt')
manifest_file = open(manifest_file_name, 'w')
for chunk in manifest:
manifest_file.write(chunk)
manifest_file.close()
开发者ID:ziima,项目名称:python-openid,代码行数:33,代码来源:builddiscover.py
示例7: add_absolute_urls
def add_absolute_urls(results, request=None):
for hit in results:
base_url = get_template_url(hit['vendor'], hit['name'], hit['version'], hit['template_uri'], request=request)
hit['uri'] = "/".join((hit['vendor'], hit['name'], hit['version']))
hit['image'] = urljoin(base_url, hit['image'])
hit['smartphoneimage'] = urljoin(base_url, hit['smartphoneimage'])
开发者ID:Mognom,项目名称:wirecloud,代码行数:7,代码来源:models.py
示例8: process_response
def process_response(self, request, response, spider):
if request.meta.get('dont_redirect', False):
return response
if request.method == 'HEAD':
if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
return self._redirect(redirected, request, spider, response.status)
else:
return response
if response.status in [302, 303] and 'Location' in response.headers:
if (response.headers['Location'] == "http://store.steampowered.com/") or \
(response.headers['Location'] == "http://store.steampowered.com") or \
('video' in response.headers['Location']):
# log.msg("Ignored home page / video redirect!")
raise IgnoreRequest()
redirected_url = urljoin(request.url, response.headers['location'])
redirected = self._redirect_request_using_get(request, redirected_url)
return self._redirect(redirected, request, spider, response.status)
if response.status in [301, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
return self._redirect(redirected, request, spider, response.status)
return response
开发者ID:casperchia,项目名称:steamGameRatings2,代码行数:28,代码来源:middlewares.py
示例9: open
def open(self, filename=None):
if filename is None:
filename = self._base_uri
else:
if self._file_type == 's3':
filename = urljoin(self._base_uri.replace(
's3://', 'http://'), filename.replace('\\', '/')).replace('http://', 's3://')
elif self._file_type == 'http':
filename = urljoin(self._base_uri, filename.replace('\\', '/'))
else:
filename = os.path.abspath(os.path.join(os.path.dirname(
self._base_uri.replace('\\', '/')), filename.replace('\\', '/')))
f = None
if self._file_type == 's3':
uri_header, uri_body = filename.split('://', 1)
us = uri_body.split('/')
bucketname = us.pop(0)
key = '/'.join(us)
logger.info('Opening {}'.format(key))
f = StringIO(self._s3_bucket.Object(key).get()['Body'].read())
elif self._file_type == 'http':
f = request.urlopen(filename)
else:
f = open(filename, 'rb')
yield f
f.close()
开发者ID:zwsong,项目名称:nnabla,代码行数:26,代码来源:data_source_loader.py
示例10: request_raw
def request_raw(method, path, params=None, body=None, headers=None,
handle_errors=True, auto_retry=True):
kwargs = {
'params': params,
'data': body,
'headers': headers,
'verify': analyzere.tls_verify,
}
username = analyzere.username
password = analyzere.password
if username and password:
kwargs['auth'] = (username, password)
resp = requests.request(method, urljoin(analyzere.base_url, path),
**kwargs)
# Handle HTTP 503 with the Retry-After header by automatically retrying
# request after sleeping for the recommended amount of time
retry_after = resp.headers.get('Retry-After')
while auto_retry and (resp.status_code == 503 and retry_after):
time.sleep(float(retry_after))
# Repeat original request after Retry-After time has elapsed.
resp = requests.request(method, urljoin(analyzere.base_url, path),
**kwargs)
retry_after = resp.headers.get('Retry-After')
if handle_errors and (not 200 <= resp.status_code < 300):
handle_api_error(resp, resp.status_code)
return resp
开发者ID:analyzere,项目名称:analyzere-python,代码行数:31,代码来源:requestor.py
示例11: _extract_links
def _extract_links(self, selector, response_url, response_encoding, base_url):
'''
Pretty much the same function, just added 'ignore' to url.encode
'''
links = []
# hacky way to get the underlying lxml parsed document
for el, attr, attr_val in self._iter_links(selector.root):
# pseudo lxml.html.HtmlElement.make_links_absolute(base_url)
try:
attr_val = urljoin(base_url, attr_val)
except ValueError:
continue # skipping bogus links
else:
url = self.process_attr(attr_val)
if url is None:
continue
if isinstance(url, unicode):
# add 'ignore' to encoding errors
url = url.encode(response_encoding, 'ignore')
# to fix relative links after process_value
url = urljoin(response_url, url)
link = Link(url, _collect_string_content(el) or u'',
nofollow=True if el.get('rel') == 'nofollow' else False)
links.append(link)
return unique_list(links, key=lambda link: link.url) \
if self.unique else links
开发者ID:animalmatsuzawa,项目名称:scrapy-cluster,代码行数:27,代码来源:lxmlhtml.py
示例12: get_subscriptions
def get_subscriptions(address):
url = urljoin(MAILMAN_INSTANCE,
"3.1/members/find?subscriber={}".format(address))
response = requests.get(url, auth=MAILMAN_AUTH)
if response.status_code >= 300:
log.error("Could not get URL %s: %d %s",
url, response.status_code, response.reason)
return []
result = response.json()
subscriptions = []
for entry in result.get("entries", []):
subscription = {
"list_id": entry["list_id"],
"role": entry["role"],
"delivery_mode": entry["delivery_mode"],
}
# Get the subscription's preferences
member_id = entry["member_id"]
pref_url = urljoin(MAILMAN_INSTANCE,
"3.1/members/{}/preferences".format(member_id))
pref_response = requests.get(pref_url, auth=MAILMAN_AUTH)
pref_result = pref_response.json()
if pref_response.status_code >= 300:
log.error("Could not get URL %s: %d %s",
pref_url, pref_response.status_code,
pref_response.reason)
else:
subscription["preferences"] = dict([
(key, pref_result[key]) for key in pref_result
if key not in ("http_etag", "self_link")
])
subscriptions.append(subscription)
return subscriptions
开发者ID:henrysher,项目名称:fedora-infra-ansible,代码行数:33,代码来源:mailman-sar.py
示例13: get_journal_about_page_url
def get_journal_about_page_url(about_page_id=0, auth=True):
"""
Return url to journal about page.
If auth=True, the url will redirect through the journals service log in page
which will prevent the "purchase now" button being shown.
If auth=False, the url will point to Journal About Page with purchase button shown
Arguments:
about_page_id (int): id of Journal About Page as found in Discovery
auth (boolen): authorization flag, if true will force login to journal service
and redirect to last visited page in Journal after login. If false, this method
will return direct url to journal about page.
Returns:
url (str): url pointing to Journals Service login, w/ a redirect to last visited journal page
or url pointing directly to journal about page.
"""
if not auth:
return urljoin(get_journals_frontend_url(), '{id}/about'.format(id=about_page_id))
# by providing just the about_page_id in the url, the user will be redirected
# to the last page viewed after logging in
about_page_url = urljoin(get_journals_frontend_url(), '{id}'.format(id=about_page_id))
login_url = urljoin(get_journals_root_url(), 'require_auth')
query = 'forward={next_url}'.format(next_url=about_page_url)
split_url = urlsplit(login_url)
url = urlunsplit((
split_url.scheme,
split_url.netloc,
split_url.path,
query,
split_url.fragment,
))
return url
开发者ID:digitalsatori,项目名称:edx-platform,代码行数:35,代码来源:api.py
示例14: export
def export(self, ds, requestor, notify):
"""
This function exports data as FITS files. To do this, the function binds metadata (keywords) to images (arrays) to create FITS files and then serves the FITS files at jsoc.stanford.edu.
Written by Monica Bobra and Art Amezcua
19 July 2016
Parameters
----------
requestor: string
Username of requestor.
notify : string
E-mail address of requestor.
ds : string
Name of the data series.
Returns
-------
supath : list
List containing paths to all the requested FITS files.
"""
# test to see if the user's e-mail address is registered with jsoc.stanford.edu
test_email_query = 'http://jsoc.stanford.edu/cgi-bin/ajax/checkAddress.sh?address='+quote_plus(notify)+'&checkonly=1'
response = urlopen(test_email_query)
data = json.loads(response.read())
if (data['status'] == 4):
raise RuntimeError('User e-mail address is not registered with jsoc.stanford.edu')
query = '?' + urlencode({'op': 'exp_request', 'protocol': 'fits', 'format': 'json', 'method': 'url', 'requestor': requestor, 'notify': notify, 'ds': ds})
req = self._json_request(self._url_jsoc_fetch + query)
# waiting for the request to be ready
if (int(req.data['status']) == 1 or int(req.data['status']) == 2):
if 'requestid' in req.data:
query = '?' + urlencode({'op': 'exp_status', 'requestid': req.data['requestid']})
supath = []
print('Waiting for the request to be ready. Please allow at least 20 seconds.')
time.sleep(15)
while True :
req = self._json_request(self._url_jsoc_fetch + query)
if (int(req.data['status']) == 1 or int(req.data['status']) == 2 or int(req.data['status']) == 6):
time.sleep(5)
elif (int(req.data['status']) == 0):
dir = req.data['dir']
for dataobj in (req.data['data']):
supath.append(urljoin(self.baseurl,os.path.join(req.data['dir'],dataobj['filename'])))
break
else:
print(type(req.data['status']))
if (req.data['status'] == 3):
raise RuntimeError('DRMS Query failed, request size is too large, status=%s' % req.data['status'])
if (req.data['status'] == 4):
raise RuntimeError('DRMS Query failed, request not formed correctly, status=%s' % req.data['status'])
if (req.data['status'] == 5):
raise RuntimeError('DRMS Query failed, export request expired, status=%s' % req.data['status'])
else:
raise RuntimeError('DRMS Query failed, there is no requestid, status=%s' % req.data['status'])
else:
raise RuntimeError('DRMS Query failed, series is not a valid series, status=%s' % req.data['status'])
print("All the data are available at:")
print(str(urljoin(self.baseurl,req.data['dir'])))
return supath
开发者ID:mbobra,项目名称:drms_json,代码行数:60,代码来源:drms_json.py
示例15: list_directory
def list_directory(urlpath, filepath):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
wsgierror). In either case, the headers are sent, making the
interface the same as for send_head().
"""
path = urlpath.rstrip('/') + '/'
listdir = os.listdir(filepath)
dirlist = []
filelist = []
for file in listdir:
if os.path.isdir(os.path.join(path, file)):
dirlist.append(file)
else:
filelist.append(file)
dirlist.sort()
filelist.sort()
res = '<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">\n'
res += '<html><head><title>{0}</title></head><body>\n'.format(path)
res += '<big><strong>Listing %s</strong></big><br>\n' % (path)
if path != '/':
item = '..'
res += 'D <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
for item in dirlist:
res += 'D <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
for item in filelist:
res += 'F <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
res += '</body></html>'
return str(res)
开发者ID:jaraco,项目名称:pycoreutils,代码行数:33,代码来源:httpd.py
示例16: __init__
def __init__(self, username=None, serverloc=None, userapikey="nokey"):
# This logic is based on ContinuumModelsClient.__init__ and
# mpl.PlotClient.__init__. There is some merged functionality here
# since a Session is meant to capture the little bit of lower-level
# logic in PlotClient (i.e. avoiding handling of things like
# _newxyplot()), but also build in the functionality of the
# ContinuumModelsClient.
self.username = username
self.root_url = serverloc
self.http_session = requests.session()
self.http_session.headers.update(
{"content-type": "application/json", "BOKEHUSER-API-KEY": userapikey, "BOKEHUSER": username}
)
if self.root_url:
url = urljoin(self.root_url, "/bokeh/userinfo/")
self.userinfo = utils.get_json(self.http_session.get(url, verify=False))
else:
logger.info("Not using a server, plots will only work in embedded mode")
self.userinfo = None
self.docid = None
self.plotcontext = None
self.apikey = None
self.bbclient = None # reference to a ContinuumModelsClient
self.base_url = urljoin(self.root_url, "/bokeh/bb/")
self.raw_js_objs = []
super(PlotServerSession, self).__init__()
开发者ID:JaakkoTulkki,项目名称:bokeh,代码行数:29,代码来源:session.py
示例17: _get_form_url
def _get_form_url(form, url):
if url is None:
action = form.get('action')
if action is None:
return form.base_url
return urljoin(form.base_url, strip_html5_whitespace(action))
return urljoin(form.base_url, url)
开发者ID:ArturGaspar,项目名称:scrapy,代码行数:7,代码来源:form.py
示例18: process_response
def process_response(self, request, response, spider):
if (request.meta.get('dont_redirect', False) or
response.status in getattr(spider, 'handle_httpstatus_list', []) or
response.status in request.meta.get('handle_httpstatus_list', []) or
request.meta.get('handle_httpstatus_all', False)):
return response
if request.method == 'HEAD':
if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
return self._redirect(redirected, request, spider, response.status)
else:
return response
if response.status in [302, 303] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = self._redirect_request_using_get(request, redirected_url)
return self._redirect(redirected, request, spider, response.status)
if response.status in [301, 307] and 'Location' in response.headers:
redirected_url = urljoin(request.url, response.headers['location'])
redirected = request.replace(url=redirected_url)
return self._redirect(redirected, request, spider, response.status)
return response
开发者ID:cdingding,项目名称:scrapy,代码行数:26,代码来源:redirect.py
示例19: fix_auth_url_version
def fix_auth_url_version(auth_url):
"""Fix up the auth url if an invalid or no version prefix was given.
People still give a v2 auth_url even when they specify that they want v3
authentication. Fix the URL to say v3 in this case and add version if it is
missing entirely. This should be smarter and use discovery.
"""
# Check for empty path component in endpoint URL and add keystone version
# to endpoint: as of Kilo, the identity URLs returned by Keystone might no
# longer contain API versions, leaving the version choice up to the user.
if urlparse.urlparse(auth_url).path.rstrip('/') == '':
if get_keystone_version() >= 3:
auth_url = urlparse.urljoin(auth_url, 'v3')
else:
auth_url = urlparse.urljoin(auth_url, 'v2.0')
if get_keystone_version() >= 3:
if has_in_url_path(auth_url, "/v2.0"):
LOG.warning("The settings.py file points to a v2.0 keystone "
"endpoint, but v3 is specified as the API version "
"to use. Using v3 endpoint for authentication.")
auth_url = url_path_replace(auth_url, "/v2.0", "/v3", 1)
return auth_url
开发者ID:ArslanRafique,项目名称:django_openstack_auth,代码行数:25,代码来源:utils.py
示例20: test_href_template
def test_href_template(self):
self.headers = {
'Client-ID': uuidutils.generate_uuid(),
'X-Project-ID': '8383830383'
}
body = self.simulate_get(self.url_prefix, headers=self.headers)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
resp = jsonutils.loads(body[0])
queue_href_template = resp['resources']['rel/queue']['href-template']
path_1 = 'https://zaqar.example.com' + self.url_prefix
path_2 = 'https://zaqar.example.com' + self.url_prefix + '/'
# Verify all the href template start with the correct version prefix
def get_href_or_template(resource):
return resource.get('href-template', '') or resource['href']
for resource in list(resp['resources']):
self.assertTrue(
get_href_or_template(resp['resources'][resource]).
startswith(self.url_prefix))
url = urlparse.urljoin(path_1, queue_href_template)
expected = ('https://zaqar.example.com' + self.url_prefix +
'/queues/foo')
self.assertEqual(expected, url.format(queue_name='foo'))
url = urlparse.urljoin(path_2, queue_href_template)
self.assertEqual(expected, url.format(queue_name='foo'))
开发者ID:openstack,项目名称:zaqar,代码行数:28,代码来源:test_home.py
注:本文中的six.moves.urllib.parse.urljoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论