• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python parse.urljoin函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib.parse.urljoin函数的典型用法代码示例。如果您正苦于以下问题:Python urljoin函数的具体用法?Python urljoin怎么用?Python urljoin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urljoin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self):

        self.verify_https = os.environ.get('OAUTHLIB_INSECURE_TRANSPORT', '') == ""
        if self.verify_https and os.environ.get("REQUESTS_CA_BUNDLE", "").strip() != "":
            self.verify_https = os.environ["REQUESTS_CA_BUNDLE"].strip()

        self.jwt_enable = six.text_type(os.environ.get('CKAN_OAUTH2_JWT_ENABLE', toolkit.config.get('ckan.oauth2.jwt.enable',''))).strip().lower() in ("true", "1", "on")

        self.legacy_idm = six.text_type(os.environ.get('CKAN_OAUTH2_LEGACY_IDM', toolkit.config.get('ckan.oauth2.legacy_idm', ''))).strip().lower() in ("true", "1", "on")
        self.authorization_endpoint = six.text_type(os.environ.get('CKAN_OAUTH2_AUTHORIZATION_ENDPOINT', toolkit.config.get('ckan.oauth2.authorization_endpoint', ''))).strip()
        self.token_endpoint = six.text_type(os.environ.get('CKAN_OAUTH2_TOKEN_ENDPOINT', toolkit.config.get('ckan.oauth2.token_endpoint', ''))).strip()
        self.profile_api_url = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_URL', toolkit.config.get('ckan.oauth2.profile_api_url', ''))).strip()
        self.client_id = six.text_type(os.environ.get('CKAN_OAUTH2_CLIENT_ID', toolkit.config.get('ckan.oauth2.client_id', ''))).strip()
        self.client_secret = six.text_type(os.environ.get('CKAN_OAUTH2_CLIENT_SECRET', toolkit.config.get('ckan.oauth2.client_secret', ''))).strip()
        self.scope = six.text_type(os.environ.get('CKAN_OAUTH2_SCOPE', toolkit.config.get('ckan.oauth2.scope', ''))).strip()
        self.rememberer_name = six.text_type(os.environ.get('CKAN_OAUTH2_REMEMBER_NAME', toolkit.config.get('ckan.oauth2.rememberer_name', 'auth_tkt'))).strip()
        self.profile_api_user_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_USER_FIELD', toolkit.config.get('ckan.oauth2.profile_api_user_field', ''))).strip()
        self.profile_api_fullname_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_FULLNAME_FIELD', toolkit.config.get('ckan.oauth2.profile_api_fullname_field', ''))).strip()
        self.profile_api_mail_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_MAIL_FIELD', toolkit.config.get('ckan.oauth2.profile_api_mail_field', ''))).strip()
        self.profile_api_groupmembership_field = six.text_type(os.environ.get('CKAN_OAUTH2_PROFILE_API_GROUPMEMBERSHIP_FIELD', toolkit.config.get('ckan.oauth2.profile_api_groupmembership_field', ''))).strip()
        self.sysadmin_group_name = six.text_type(os.environ.get('CKAN_OAUTH2_SYSADMIN_GROUP_NAME', toolkit.config.get('ckan.oauth2.sysadmin_group_name', ''))).strip()

        self.redirect_uri = urljoin(urljoin(toolkit.config.get('ckan.site_url', 'http://localhost:5000'), toolkit.config.get('ckan.root_path')), constants.REDIRECT_URL)

        # Init db
        db.init_db(model)

        missing = [key for key in REQUIRED_CONF if getattr(self, key, "") == ""]
        if missing:
            raise ValueError("Missing required oauth2 conf: %s" % ", ".join(missing))
        elif self.scope == "":
            self.scope = None
开发者ID:conwetlab,项目名称:ckanext-oauth2,代码行数:32,代码来源:oauth2.py


示例2: startElementNS

 def startElementNS(self, name, qname, attrs):
     stack = self.stack
     stack.append(ElementHandler())
     current = self.current
     parent = self.parent
     base = attrs.get(BASE, None)
     if base is not None:
         base, frag = urldefrag(base)
         if parent and parent.base:
             base = urljoin(parent.base, base)
         else:
             systemId = self.locator.getPublicId() \
                 or self.locator.getSystemId()
             if systemId:
                 base = urljoin(systemId, base)
     else:
         if parent:
             base = parent.base
         if base is None:
             systemId = self.locator.getPublicId() \
                 or self.locator.getSystemId()
             if systemId:
                 base, frag = urldefrag(systemId)
     current.base = base
     language = attrs.get(LANG, None)
     if language is None:
         if parent:
             language = parent.language
     current.language = language
     current.start(name, qname, attrs)
开发者ID:drewp,项目名称:rdflib,代码行数:30,代码来源:rdfxml.py


示例3: process_response

    def process_response(self, request, response, spider):
        if request.meta.get('dont_redirect', False):
            return response

        if request.method == 'HEAD':
            if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
                redirected_url = urljoin(request.url, response.headers['location'])
                redirected = request.replace(url=redirected_url)
                return self._redirect(redirected, request, spider, response.status)
            else:
                return response

        referer = request.url

        if response.status in [302, 303] and 'Location' in response.headers:
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = self._redirect_request_using_get(request, redirected_url)
            redirected = self._redirect(redirected, request, spider, response.status)
            redirected.headers['Referer'] = referer
            return redirected

        referer = request.url

        if response.status in [301, 307] and 'Location' in response.headers:
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = request.replace(url=redirected_url)
            redirected = self._redirect(redirected, request, spider, response.status)
            redirected.headers['Referer'] = referer
            return redirected

        return response
开发者ID:ICCV,项目名称:chaos,代码行数:31,代码来源:__init__.py


示例4: open_in_browser

def open_in_browser(config_obj, jql_query, query_log_path, new_tab=False):
    """
    Open browser in JIRA with the retrieved keys from Stash as url-params

    :param config_obj:
    :param jql_query:
    :return:
    """

    jira_url = config_obj.jira_url
    if len(jql_query) < OPEN_IN_BROWSER_BELOW:
        params = {
            'jql': jql_query
        }
        b_url = urljoin(urljoin(jira_url, 'issues/'), '?' + urlencode(params))
        if new_tab:
            webbrowser.open(b_url, new=2)
        else:
            webbrowser.open(b_url, new=0)
    else:
        click.echo("Too much data to open in browser.")

    click.echo("Query saved to " + query_log_path)
    with open(query_log_path, 'a') as f:
        f.write(jql_query + "\n\n")
开发者ID:enrico2828,项目名称:Stash2Jira,代码行数:25,代码来源:cli.py


示例5: _send_batch

    def _send_batch(self, destination, events):
        ''' Makes a single batch API request with the given list of events. The
        `destination` argument contains the write key, API host and dataset
        name used to build the request.'''
        start = time.time()
        status_code = 0
        try:
            url = urljoin(urljoin(destination.api_host, "/1/batch/"),
                          destination.dataset)
            payload = []
            for ev in events:
                event_time = ev.created_at.isoformat()
                if ev.created_at.tzinfo is None:
                    event_time += "Z"
                payload.append({
                    "time": event_time,
                    "samplerate": ev.sample_rate,
                    "data": ev.fields()})

            self.log("firing batch, size = %d", len(payload))
            resp = self.session.post(
                url,
                headers={"X-Honeycomb-Team": destination.writekey, "Content-Type": "application/json"},
                data=json.dumps(payload, default=json_default_handler),
                timeout=10.0,
            )
            status_code = resp.status_code
            resp.raise_for_status()
            statuses = [{"status": d.get("status"), "error": d.get("error")} for d in resp.json()]
            for ev, status in zip(events, statuses):
                self._enqueue_response(status.get("status"), "", status.get("error"), start, ev.metadata)

        except Exception as e:
            # Catch all exceptions and hand them to the responses queue.
            self._enqueue_errors(status_code, e, start, events)
开发者ID:honeycombio,项目名称:libhoney-py,代码行数:35,代码来源:transmission.py


示例6: buildDiscover

def buildDiscover(base_url, out_dir):
    """Convert all files in a directory to apache mod_asis files in
    another directory."""
    test_data = discoverdata.readTests(discoverdata.default_test_file)

    def writeTestFile(test_name):
        template = test_data[test_name]

        data = discoverdata.fillTemplate(
            test_name, template, base_url, discoverdata.example_xrds)

        out_file_name = os.path.join(out_dir, test_name)
        out_file = open(out_file_name, 'w')
        out_file.write(data)

    manifest = [manifest_header]
    for success, input_name, id_name, result_name in discoverdata.testlist:
        if not success:
            continue
        writeTestFile(input_name)

        input_url = urljoin(base_url, input_name)
        id_url = urljoin(base_url, id_name)
        result_url = urljoin(base_url, result_name)

        manifest.append('\t'.join((input_url, id_url, result_url)))
        manifest.append('\n')

    manifest_file_name = os.path.join(out_dir, 'manifest.txt')
    manifest_file = open(manifest_file_name, 'w')
    for chunk in manifest:
        manifest_file.write(chunk)
    manifest_file.close()
开发者ID:ziima,项目名称:python-openid,代码行数:33,代码来源:builddiscover.py


示例7: add_absolute_urls

def add_absolute_urls(results, request=None):

    for hit in results:
        base_url = get_template_url(hit['vendor'], hit['name'], hit['version'], hit['template_uri'], request=request)
        hit['uri'] = "/".join((hit['vendor'], hit['name'], hit['version']))
        hit['image'] = urljoin(base_url, hit['image'])
        hit['smartphoneimage'] = urljoin(base_url, hit['smartphoneimage'])
开发者ID:Mognom,项目名称:wirecloud,代码行数:7,代码来源:models.py


示例8: process_response

    def process_response(self, request, response, spider):
        if request.meta.get('dont_redirect', False):
            return response

        if request.method == 'HEAD':
            if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
                redirected_url = urljoin(request.url, response.headers['location'])
                redirected = request.replace(url=redirected_url)
                return self._redirect(redirected, request, spider, response.status)
            else:
                return response

        if response.status in [302, 303] and 'Location' in response.headers:
            if (response.headers['Location'] == "http://store.steampowered.com/") or \
               (response.headers['Location'] == "http://store.steampowered.com") or \
               ('video' in response.headers['Location']):
                # log.msg("Ignored home page / video redirect!")
                raise IgnoreRequest()
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = self._redirect_request_using_get(request, redirected_url)
            return self._redirect(redirected, request, spider, response.status)

        if response.status in [301, 307] and 'Location' in response.headers:
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = request.replace(url=redirected_url)
            return self._redirect(redirected, request, spider, response.status)

        return response
开发者ID:casperchia,项目名称:steamGameRatings2,代码行数:28,代码来源:middlewares.py


示例9: open

 def open(self, filename=None):
     if filename is None:
         filename = self._base_uri
     else:
         if self._file_type == 's3':
             filename = urljoin(self._base_uri.replace(
                 's3://', 'http://'), filename.replace('\\', '/')).replace('http://', 's3://')
         elif self._file_type == 'http':
             filename = urljoin(self._base_uri, filename.replace('\\', '/'))
         else:
             filename = os.path.abspath(os.path.join(os.path.dirname(
                 self._base_uri.replace('\\', '/')), filename.replace('\\', '/')))
     f = None
     if self._file_type == 's3':
         uri_header, uri_body = filename.split('://', 1)
         us = uri_body.split('/')
         bucketname = us.pop(0)
         key = '/'.join(us)
         logger.info('Opening {}'.format(key))
         f = StringIO(self._s3_bucket.Object(key).get()['Body'].read())
     elif self._file_type == 'http':
         f = request.urlopen(filename)
     else:
         f = open(filename, 'rb')
     yield f
     f.close()
开发者ID:zwsong,项目名称:nnabla,代码行数:26,代码来源:data_source_loader.py


示例10: request_raw

def request_raw(method, path, params=None, body=None, headers=None,
                handle_errors=True, auto_retry=True):
    kwargs = {
        'params': params,
        'data': body,
        'headers': headers,
        'verify': analyzere.tls_verify,
    }

    username = analyzere.username
    password = analyzere.password
    if username and password:
        kwargs['auth'] = (username, password)

    resp = requests.request(method, urljoin(analyzere.base_url, path),
                            **kwargs)

    # Handle HTTP 503 with the Retry-After header by automatically retrying
    # request after sleeping for the recommended amount of time
    retry_after = resp.headers.get('Retry-After')
    while auto_retry and (resp.status_code == 503 and retry_after):
        time.sleep(float(retry_after))
        # Repeat original request after Retry-After time has elapsed.
        resp = requests.request(method, urljoin(analyzere.base_url, path),
                                **kwargs)
        retry_after = resp.headers.get('Retry-After')

    if handle_errors and (not 200 <= resp.status_code < 300):
        handle_api_error(resp, resp.status_code)

    return resp
开发者ID:analyzere,项目名称:analyzere-python,代码行数:31,代码来源:requestor.py


示例11: _extract_links

    def _extract_links(self, selector, response_url, response_encoding, base_url):
        '''
        Pretty much the same function, just added 'ignore' to url.encode
        '''
        links = []
        # hacky way to get the underlying lxml parsed document
        for el, attr, attr_val in self._iter_links(selector.root):
            # pseudo lxml.html.HtmlElement.make_links_absolute(base_url)
            try:
                attr_val = urljoin(base_url, attr_val)
            except ValueError:
                continue  # skipping bogus links
            else:
                url = self.process_attr(attr_val)
                if url is None:
                    continue
            if isinstance(url, unicode):
                # add 'ignore' to encoding errors
                url = url.encode(response_encoding, 'ignore')
            # to fix relative links after process_value
            url = urljoin(response_url, url)
            link = Link(url, _collect_string_content(el) or u'',
                        nofollow=True if el.get('rel') == 'nofollow' else False)
            links.append(link)

        return unique_list(links, key=lambda link: link.url) \
                if self.unique else links
开发者ID:animalmatsuzawa,项目名称:scrapy-cluster,代码行数:27,代码来源:lxmlhtml.py


示例12: get_subscriptions

def get_subscriptions(address):
    url = urljoin(MAILMAN_INSTANCE,
                  "3.1/members/find?subscriber={}".format(address))
    response = requests.get(url, auth=MAILMAN_AUTH)
    if response.status_code >= 300:
        log.error("Could not get URL %s: %d %s",
                  url, response.status_code, response.reason)
        return []
    result = response.json()
    subscriptions = []
    for entry in result.get("entries", []):
        subscription = {
            "list_id": entry["list_id"],
            "role": entry["role"],
            "delivery_mode": entry["delivery_mode"],
        }
        # Get the subscription's preferences
        member_id = entry["member_id"]
        pref_url = urljoin(MAILMAN_INSTANCE,
                           "3.1/members/{}/preferences".format(member_id))
        pref_response = requests.get(pref_url, auth=MAILMAN_AUTH)
        pref_result = pref_response.json()
        if pref_response.status_code >= 300:
            log.error("Could not get URL %s: %d %s",
                      pref_url, pref_response.status_code,
                      pref_response.reason)
        else:
            subscription["preferences"] = dict([
                (key, pref_result[key]) for key in pref_result
                if key not in ("http_etag", "self_link")
            ])
        subscriptions.append(subscription)
    return subscriptions
开发者ID:henrysher,项目名称:fedora-infra-ansible,代码行数:33,代码来源:mailman-sar.py


示例13: get_journal_about_page_url

def get_journal_about_page_url(about_page_id=0, auth=True):
    """
    Return url to journal about page.
    If auth=True, the url will redirect through the journals service log in page
    which will prevent the "purchase now" button being shown.
    If auth=False, the url will point to Journal About Page with purchase button shown

    Arguments:
        about_page_id (int): id of Journal About Page as found in Discovery
        auth (boolen): authorization flag, if true will force login to journal service
        and redirect to last visited page in Journal after login. If false, this method
        will return direct url to journal about page.

    Returns:
        url (str): url pointing to Journals Service login, w/ a redirect to last visited journal page
        or url pointing directly to journal about page.
    """
    if not auth:
        return urljoin(get_journals_frontend_url(), '{id}/about'.format(id=about_page_id))

    # by providing just the about_page_id in the url, the user will be redirected
    # to the last page viewed after logging in
    about_page_url = urljoin(get_journals_frontend_url(), '{id}'.format(id=about_page_id))
    login_url = urljoin(get_journals_root_url(), 'require_auth')
    query = 'forward={next_url}'.format(next_url=about_page_url)

    split_url = urlsplit(login_url)
    url = urlunsplit((
        split_url.scheme,
        split_url.netloc,
        split_url.path,
        query,
        split_url.fragment,
    ))
    return url
开发者ID:digitalsatori,项目名称:edx-platform,代码行数:35,代码来源:api.py


示例14: export

    def export(self, ds, requestor, notify):
        """
        This function exports data as FITS files. To do this, the function binds metadata (keywords) to images (arrays) to create FITS files and then serves the FITS files at jsoc.stanford.edu.
        Written by Monica Bobra and Art Amezcua
        19 July 2016

        Parameters
        ----------
        requestor: string
        	Username of requestor.
        notify   : string
        	E-mail address of requestor.
        ds       : string
            Name of the data series.

        Returns
        -------
        supath : list
            List containing paths to all the requested FITS files.
		"""
		# test to see if the user's e-mail address is registered with jsoc.stanford.edu
        test_email_query = 'http://jsoc.stanford.edu/cgi-bin/ajax/checkAddress.sh?address='+quote_plus(notify)+'&checkonly=1'
        response = urlopen(test_email_query)
        data = json.loads(response.read())
        if (data['status'] == 4):
		    raise RuntimeError('User e-mail address is not registered with jsoc.stanford.edu')
        query = '?' + urlencode({'op': 'exp_request', 'protocol': 'fits', 'format': 'json', 'method': 'url', 'requestor': requestor, 'notify': notify, 'ds': ds})
        req = self._json_request(self._url_jsoc_fetch + query)
        # waiting for the request to be ready
        if (int(req.data['status']) == 1 or int(req.data['status']) == 2):
            if 'requestid' in req.data:
                query = '?' + urlencode({'op': 'exp_status', 'requestid': req.data['requestid']})
                supath = []
                print('Waiting for the request to be ready. Please allow at least 20 seconds.')
                time.sleep(15)
                while True :  
                    req = self._json_request(self._url_jsoc_fetch + query)
                    if (int(req.data['status']) == 1 or int(req.data['status']) == 2 or int(req.data['status']) == 6):
                        time.sleep(5)
                    elif (int(req.data['status']) == 0):
                        dir = req.data['dir']
                        for dataobj in (req.data['data']):
                            supath.append(urljoin(self.baseurl,os.path.join(req.data['dir'],dataobj['filename'])))
                        break
                    else:
                        print(type(req.data['status']))
                        if (req.data['status'] == 3):
                            raise RuntimeError('DRMS Query failed, request size is too large, status=%s' % req.data['status'])
                        if (req.data['status'] == 4):
                            raise RuntimeError('DRMS Query failed, request not formed correctly, status=%s' % req.data['status'])
                        if (req.data['status'] == 5):
                            raise RuntimeError('DRMS Query failed, export request expired, status=%s' % req.data['status'])
                            
            else:
                raise RuntimeError('DRMS Query failed, there is no requestid, status=%s' % req.data['status'])
        else:
            raise RuntimeError('DRMS Query failed, series is not a valid series, status=%s' % req.data['status'])
        print("All the data are available at:")
        print(str(urljoin(self.baseurl,req.data['dir'])))
        return supath
开发者ID:mbobra,项目名称:drms_json,代码行数:60,代码来源:drms_json.py


示例15: list_directory

def list_directory(urlpath, filepath):
    """Helper to produce a directory listing (absent index.html).

    Return value is either a file object, or None (indicating an
    wsgierror).  In either case, the headers are sent, making the
    interface the same as for send_head().
    """
    path = urlpath.rstrip('/') + '/'
    listdir = os.listdir(filepath)
    dirlist = []
    filelist = []

    for file in listdir:
        if os.path.isdir(os.path.join(path, file)):
            dirlist.append(file)
        else:
            filelist.append(file)

    dirlist.sort()
    filelist.sort()

    res = '<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">\n'
    res += '<html><head><title>{0}</title></head><body>\n'.format(path)
    res += '<big><strong>Listing %s</strong></big><br>\n' % (path)
    if path != '/':
        item = '..'
        res += 'D <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
    for item in dirlist:
        res += 'D <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
    for item in filelist:
        res += 'F <a href=%s>%s</a><br/>\n' % (urljoin(path, item), item)
    res += '</body></html>'
    return str(res)
开发者ID:jaraco,项目名称:pycoreutils,代码行数:33,代码来源:httpd.py


示例16: __init__

    def __init__(self, username=None, serverloc=None, userapikey="nokey"):
        # This logic is based on ContinuumModelsClient.__init__ and
        # mpl.PlotClient.__init__.  There is some merged functionality here
        # since a Session is meant to capture the little bit of lower-level
        # logic in PlotClient (i.e. avoiding handling of things like
        # _newxyplot()), but also build in the functionality of the
        # ContinuumModelsClient.

        self.username = username
        self.root_url = serverloc
        self.http_session = requests.session()
        self.http_session.headers.update(
            {"content-type": "application/json", "BOKEHUSER-API-KEY": userapikey, "BOKEHUSER": username}
        )

        if self.root_url:
            url = urljoin(self.root_url, "/bokeh/userinfo/")
            self.userinfo = utils.get_json(self.http_session.get(url, verify=False))
        else:
            logger.info("Not using a server, plots will only work in embedded mode")
            self.userinfo = None

        self.docid = None
        self.plotcontext = None
        self.apikey = None
        self.bbclient = None  # reference to a ContinuumModelsClient
        self.base_url = urljoin(self.root_url, "/bokeh/bb/")
        self.raw_js_objs = []
        super(PlotServerSession, self).__init__()
开发者ID:JaakkoTulkki,项目名称:bokeh,代码行数:29,代码来源:session.py


示例17: _get_form_url

def _get_form_url(form, url):
    if url is None:
        action = form.get('action')
        if action is None:
            return form.base_url
        return urljoin(form.base_url, strip_html5_whitespace(action))
    return urljoin(form.base_url, url)
开发者ID:ArturGaspar,项目名称:scrapy,代码行数:7,代码来源:form.py


示例18: process_response

    def process_response(self, request, response, spider):
        if (request.meta.get('dont_redirect', False) or
               response.status in getattr(spider, 'handle_httpstatus_list', []) or
               response.status in request.meta.get('handle_httpstatus_list', []) or
               request.meta.get('handle_httpstatus_all', False)):
            return response

        if request.method == 'HEAD':
            if response.status in [301, 302, 303, 307] and 'Location' in response.headers:
                redirected_url = urljoin(request.url, response.headers['location'])
                redirected = request.replace(url=redirected_url)
                return self._redirect(redirected, request, spider, response.status)
            else:
                return response

        if response.status in [302, 303] and 'Location' in response.headers:
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = self._redirect_request_using_get(request, redirected_url)
            return self._redirect(redirected, request, spider, response.status)

        if response.status in [301, 307] and 'Location' in response.headers:
            redirected_url = urljoin(request.url, response.headers['location'])
            redirected = request.replace(url=redirected_url)
            return self._redirect(redirected, request, spider, response.status)

        return response
开发者ID:cdingding,项目名称:scrapy,代码行数:26,代码来源:redirect.py


示例19: fix_auth_url_version

def fix_auth_url_version(auth_url):
    """Fix up the auth url if an invalid or no version prefix was given.

    People still give a v2 auth_url even when they specify that they want v3
    authentication. Fix the URL to say v3 in this case and add version if it is
    missing entirely. This should be smarter and use discovery.
    """

    # Check for empty path component in endpoint URL and add keystone version
    # to endpoint: as of Kilo, the identity URLs returned by Keystone might no
    # longer contain API versions, leaving the version choice up to the user.
    if urlparse.urlparse(auth_url).path.rstrip('/') == '':
        if get_keystone_version() >= 3:
            auth_url = urlparse.urljoin(auth_url, 'v3')
        else:
            auth_url = urlparse.urljoin(auth_url, 'v2.0')

    if get_keystone_version() >= 3:
        if has_in_url_path(auth_url, "/v2.0"):
            LOG.warning("The settings.py file points to a v2.0 keystone "
                        "endpoint, but v3 is specified as the API version "
                        "to use. Using v3 endpoint for authentication.")
            auth_url = url_path_replace(auth_url, "/v2.0", "/v3", 1)

    return auth_url
开发者ID:ArslanRafique,项目名称:django_openstack_auth,代码行数:25,代码来源:utils.py


示例20: test_href_template

    def test_href_template(self):
        self.headers = {
            'Client-ID': uuidutils.generate_uuid(),
            'X-Project-ID': '8383830383'
        }
        body = self.simulate_get(self.url_prefix, headers=self.headers)
        self.assertEqual(falcon.HTTP_200, self.srmock.status)
        resp = jsonutils.loads(body[0])
        queue_href_template = resp['resources']['rel/queue']['href-template']
        path_1 = 'https://zaqar.example.com' + self.url_prefix
        path_2 = 'https://zaqar.example.com' + self.url_prefix + '/'

        # Verify all the href template start with the correct version prefix
        def get_href_or_template(resource):
            return resource.get('href-template', '') or resource['href']

        for resource in list(resp['resources']):
            self.assertTrue(
                get_href_or_template(resp['resources'][resource]).
                startswith(self.url_prefix))

        url = urlparse.urljoin(path_1, queue_href_template)
        expected = ('https://zaqar.example.com' + self.url_prefix +
                    '/queues/foo')
        self.assertEqual(expected, url.format(queue_name='foo'))

        url = urlparse.urljoin(path_2, queue_href_template)
        self.assertEqual(expected, url.format(queue_name='foo'))
开发者ID:openstack,项目名称:zaqar,代码行数:28,代码来源:test_home.py



注:本文中的six.moves.urllib.parse.urljoin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.urlparse函数代码示例发布时间:2022-05-27
下一篇:
Python parse.urlencode函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap