• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python parse.urlsplit函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib.parse.urlsplit函数的典型用法代码示例。如果您正苦于以下问题:Python urlsplit函数的具体用法?Python urlsplit怎么用?Python urlsplit使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了urlsplit函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _parse_url

    def _parse_url(self, url):
        """Create a url from test data.

        If provided with a full URL, just return that. If SSL is requested
        set the scheme appropriately.

        Scheme and netloc are saved for later use in comparisons.
        """
        query_params = self.test_data['query_parameters']
        ssl = self.test_data['ssl']

        parsed_url = urlparse.urlsplit(url)
        if not parsed_url.scheme:
            full_url = utils.create_url(url, self.host, port=self.port,
                                        prefix=self.prefix, ssl=ssl)
            # parse again to set updated netloc and scheme
            parsed_url = urlparse.urlsplit(full_url)

        self.scheme = parsed_url.scheme
        self.netloc = parsed_url.netloc

        if query_params:
            query_string = self._update_query_params(parsed_url.query,
                                                     query_params)
        else:
            query_string = parsed_url.query

        return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
                                    parsed_url.path, query_string, ''))
开发者ID:cdent,项目名称:gabbi,代码行数:29,代码来源:case.py


示例2: copy_rec_files_s3

    def copy_rec_files_s3(self, user, collection, recording, warc_files):
        coll_warc_key = recording.COLL_WARC_KEY.format(coll=collection.my_id)
        rec_warc_key = recording.REC_WARC_KEY.format(rec=recording.my_id)

        # Copy WARCs
        total_size = 0

        coll_root = self.s3_root + collection.get_dir_path() + '/'

        for n, url in warc_files.items():
            if not url.startswith('s3://'):
                print('Skipping: ' + url)
                continue

            src_parts = urlsplit(url)
            src_bucket = src_parts.netloc
            src_key = src_parts.path.lstrip('/')

            try:
                res = self.s3.head_object(Bucket=src_bucket,
                                          Key=src_key)
            except Exception as e:
                print('Skipping: ' + url)
                print(e)
                continue

            size = res['ContentLength']

            if n != recording.INDEX_FILE_KEY:
                target_file = coll_root + 'warcs/' + n

                self.redis.hset(coll_warc_key, n, target_file)
                self.redis.sadd(rec_warc_key, n)
                total_size += size
            else:
                target_file = coll_root + 'indexes/' + os.path.basename(url)
                recording.set_prop(n, target_file, update_ts=False)

            # target
            target_parts = urlsplit(target_file)
            target_bucket = target_parts.netloc
            target_key = target_parts.path.lstrip('/')

            params = dict(Bucket=target_bucket,
                          Key=target_key,
                          CopySource=dict(Bucket=src_bucket, Key=src_key))

            print('    Copying:')
            print('      From: s3://' + src_bucket + '/' + src_key)
            print('      To: s3://' + target_bucket + '/' + target_key)
            print('      Size: ' + str(size))

            try:
                if not self.dry_run:
                    res = self.s3.copy_object(**params)
            except Exception as e:
                print('    ERROR:')
                print(e)

        return total_size
开发者ID:webrecorder,项目名称:webrecorder,代码行数:60,代码来源:migrate4.0.py


示例3: __init__

 def __init__(self, base, relative=None):
     self._has_token = False
     self._url = None
     self._url_parts = None
     self._loaded = False
     self._xml = None
     self._url_parts = None
     self._headers = None
     self._config = None
     if isinstance(base, six.string_types):
         base_url = base
         self._url_parts = list(parse.urlsplit(base_url))
     elif isinstance(base, RequestBase):
         base_url = base.url
         self._has_token = base.has_token
         self._url_parts = base._url_parts[:]
         self._headers = base._headers
         self._config = base.config
     if relative:
         scheme, netloc, path, qs, fragment = parse.urlsplit(relative)
         if path:
             self._url_parts[2] = _join_plex(self._url_parts[2], path)
         if qs:
             data = parse.parse_qsl(self._url_parts[3]) + parse.parse_qsl(qs)
             self._url_parts[3] = parse.urlencode(data)
         else:
             # Strip of all non-token parts
             data = parse.parse_qsl(self._url_parts[3])
             self._url_parts[3] = parse.urlencode([(x, y) for x, y in data if x == 'X-Plex-Token'])
     if not self._has_token:
         self._has_token = 'X-Plex-Token' in parse.parse_qs(self._url_parts[3])
     self._url = parse.urlunsplit(self._url_parts)
开发者ID:Xaroth,项目名称:plex-export,代码行数:32,代码来源:base.py


示例4: _update_link_prefix

 def _update_link_prefix(self, orig_url, prefix):
     if not prefix:
         return orig_url
     url_parts = list(parse.urlsplit(orig_url))
     prefix_parts = list(parse.urlsplit(prefix))
     url_parts[0:2] = prefix_parts[0:2]
     return parse.urlunsplit(url_parts)
开发者ID:openstack,项目名称:manila,代码行数:7,代码来源:common.py


示例5: test_authentication_disabled_app

def test_authentication_disabled_app(authentication_disabled_app):
    # app.auth should = false
    assert mg_globals
    assert mg_globals.app.auth is False

    # Try to visit register page
    template.clear_test_template_context()
    response = authentication_disabled_app.get('/auth/register/')
    response.follow()

    # Correct redirect?
    assert urlparse.urlsplit(response.location)[2] == '/'
    assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT

    # Try to vist login page
    template.clear_test_template_context()
    response = authentication_disabled_app.get('/auth/login/')
    response.follow()

    # Correct redirect?
    assert urlparse.urlsplit(response.location)[2] == '/'
    assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT

    ## Test check_login_simple should return None
    assert auth_tools.check_login_simple('test', 'simple') is None

    # Try to visit the forgot password page
    template.clear_test_template_context()
    response = authentication_disabled_app.get('/auth/register/')
    response.follow()

    # Correct redirect?
    assert urlparse.urlsplit(response.location)[2] == '/'
    assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:34,代码来源:test_auth.py


示例6: test_notify_alarm_with_batch_listener

 def test_notify_alarm_with_batch_listener(self, logger):
     data1 = {
         "actions": ["test://"],
         "alarm_id": "foobar",
         "alarm_name": "testalarm",
         "severity": "critical",
         "previous": "OK",
         "current": "ALARM",
         "reason": "Everything is on fire",
         "reason_data": {"fire": "everywhere"},
     }
     data2 = {
         "actions": ["test://"],
         "alarm_id": "foobar2",
         "alarm_name": "testalarm2",
         "severity": "low",
         "previous": "ALARM",
         "current": "OK",
         "reason": "Everything is fine",
         "reason_data": {"fine": "fine"},
     }
     self.service.terminate()
     self.CONF.set_override("batch_size", 2, "notifier")
     # Init a new service with new configuration
     self.svc = notifier.AlarmNotifierService(0, self.CONF)
     self.addCleanup(self.svc.terminate)
     self._msg_notifier.sample({}, "alarm.update", data1)
     self._msg_notifier.sample({}, "alarm.update", data2)
     time.sleep(1)
     notifications = self.svc.notifiers["test"].obj.notifications
     self.assertEqual(2, len(notifications))
     self.assertEqual(
         (
             urlparse.urlsplit(data1["actions"][0]),
             data1["alarm_id"],
             data1["alarm_name"],
             data1["severity"],
             data1["previous"],
             data1["current"],
             data1["reason"],
             data1["reason_data"],
         ),
         notifications[0],
     )
     self.assertEqual(
         (
             urlparse.urlsplit(data2["actions"][0]),
             data2["alarm_id"],
             data2["alarm_name"],
             data2["severity"],
             data2["previous"],
             data2["current"],
             data2["reason"],
             data2["reason_data"],
         ),
         notifications[1],
     )
     self.assertEqual(mock.call("Received %s messages in batch.", 2), logger.call_args_list[0])
开发者ID:openstack,项目名称:aodh,代码行数:58,代码来源:test_notifier.py


示例7: get_bottom_url

def get_bottom_url(t_ver, t_url, b_ver, b_endpoint):
    """get_bottom_url

    convert url received by Tricircle service to bottom OpenStack
    request url through the configured endpoint in the KeyStone

    :param t_ver: version of top service
    :param t_url: request url to the top service
    :param b_ver: version of bottom service
    :param b_endpoint: endpoint registered in keystone for bottom service
    :return: request url to bottom service
    """
    t_parse = urlparse.urlsplit(t_url)

    after_ver = t_parse.path

    remove_ver = '/' + t_ver + '/'
    pos = after_ver.find(remove_ver)

    if pos == 0:
        after_ver = after_ver[len(remove_ver):]
    else:
        remove_ver = t_ver + '/'
        pos = after_ver.find(remove_ver)
        if pos == 0:
            after_ver = after_ver[len(remove_ver):]

    if after_ver == t_parse.path:
        # wrong t_url
        return ''

    b_parse = urlparse.urlsplit(b_endpoint)

    scheme = b_parse.scheme
    netloc = b_parse.netloc
    path = '/' + b_ver + '/' + after_ver
    if b_ver == '':
        path = '/' + after_ver

    # Remove availability_zone filter since it is handled by VolumeController.
    # VolumeController will send GET request only to bottom pods whose AZ
    # is specified in availability_zone filter.
    query_filters = []
    for k, v in urlparse.parse_qsl(t_parse.query):
        if k == 'availability_zone':
            continue
        query_filters.append((k, v))
    query = urlparse.urlencode(query_filters)

    fragment = t_parse.fragment

    b_url = urlparse.urlunsplit((scheme,
                                 netloc,
                                 path,
                                 query,
                                 fragment))
    return b_url
开发者ID:LongXQ,项目名称:tricircle,代码行数:57,代码来源:httpclient.py


示例8: __clean_external_url

def __clean_external_url(url):
    if url.startswith(('http://', 'https://', '//')):
        # Do the domains and ports match?
        pnext = urlsplit(url)
        preq = urlsplit(request.url)
        if pnext.port != preq.port:
            return ''
        if not (pnext.hostname == preq.hostname or pnext.hostname.endswith('.' + preq.hostname)):
            return ''
    return url
开发者ID:hasgeek,项目名称:coaster,代码行数:10,代码来源:misc.py


示例9: _test_edit_persona

        def _test_edit_persona():
            # Try and delete only Persona email address
            template.clear_test_template_context()
            res = persona_plugin_app.post(
                '/edit/persona/',
                {'email': '[email protected]'})

            assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
            form = context['form']

            assert form.email.errors == [u"You can't delete your only Persona email address unless you have a password set."]

            template.clear_test_template_context()
            res = persona_plugin_app.post(
                '/edit/persona/', {})

            assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
            form = context['form']

            assert form.email.errors == [u'This field is required.']

            # Try and delete Persona not owned by the user
            template.clear_test_template_context()
            res = persona_plugin_app.post(
                '/edit/persona/',
                {'email': '[email protected]'})

            assert 'mediagoblin/plugins/persona/edit.html' in template.TEMPLATE_TEST_CONTEXT
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/persona/edit.html']
            form = context['form']

            assert form.email.errors == [u'That Persona email address is not registered to this account.']

            res = persona_plugin_app.get('/edit/persona/add/')

            assert urlparse.urlsplit(res.location)[2] == '/edit/persona/'

            # Add Persona email address
            template.clear_test_template_context()
            res = persona_plugin_app.post(
                '/edit/persona/add/')
            res.follow()

            assert urlparse.urlsplit(res.location)[2] == '/edit/account/'

            # Delete a Persona
            res = persona_plugin_app.post(
                '/edit/persona/',
                {'email': '[email protected]'})
            res.follow()

            assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
开发者ID:ausbin,项目名称:mediagoblin,代码行数:54,代码来源:test_persona.py


示例10: create_cookie

def create_cookie(trans, key_name, key, email, age=DEFAULT_BIOSTAR_COOKIE_AGE, override_never_authenticate=False):
    if trans.app.config.biostar_never_authenticate and not override_never_authenticate:
        log.debug('A BioStar link was clicked, but never authenticate has been enabled, so we will not create the login cookie.')
        return
    digest = hmac.new(key, email).hexdigest()
    value = "%s:%s" % (email, digest)
    trans.set_cookie(value, name=key_name, path='/', age=age, version='1')
    # We need to explicitly set the domain here, in order to allow for biostar in a subdomain to work
    galaxy_hostname = urlsplit(url_for('/', qualified=True)).hostname
    biostar_hostname = urlsplit(trans.app.config.biostar_url).hostname
    trans.response.cookies[key_name]['domain'] = determine_cookie_domain(galaxy_hostname, biostar_hostname)
开发者ID:ImmPortDB,项目名称:immport-galaxy,代码行数:11,代码来源:biostar.py


示例11: prepare_url

def prepare_url(orig_url):
    """Takes a link and swaps in network_link_prefix if set."""
    prefix = cfg.CONF.network_link_prefix
    # Copied directly from nova/api/openstack/common.py
    if not prefix:
        return orig_url
    url_parts = list(parse.urlsplit(orig_url))
    prefix_parts = list(parse.urlsplit(prefix))
    url_parts[0:2] = prefix_parts[0:2]
    url_parts[2] = prefix_parts[2] + url_parts[2]
    return parse.urlunsplit(url_parts).rstrip('/')
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:11,代码来源:api_common.py


示例12: parse_host

    def parse_host(host):
        if host.startswith('http://') or host.startswith('https://'):
            parsed = urlsplit(host)
        else:
            scheme = 'https' if settings.INTRACLUSTER_HTTPS else 'http'
            parsed = urlsplit(scheme + '://' + host)

        return {
          'host': parsed.netloc,
          'url': '%s://%s%s' % (parsed.scheme, parsed.netloc, parsed.path),
          'params': {key: value[-1] for (key, value) in parse_qs(parsed.query).items()},
        }
开发者ID:cbowman0,项目名称:graphite-web,代码行数:12,代码来源:remote.py


示例13: extra_clean

    def extra_clean(self, doc):
        for el in doc.xpath('//*[@href]'):
            scheme, netloc, path, query, fragment = urlsplit(el.attrib['href'])
            if scheme and scheme not in self.allowed_protocols:
                el.drop_tag()

        for attr in self.attr_val_is_uri:
            if attr == 'href':
                continue
            for el in doc.xpath('//*[@'+attr+']'):
                scheme, netloc, path, query, fragment = urlsplit(el.attrib[attr])
                scheme_fail = scheme and scheme not in self.allowed_protocols
                netloc_fail = not self.allow_external_src and netloc
                if scheme_fail or netloc_fail:
                    if attr == 'src':
                        el.drop_tag()
                    else:
                        el.attrib.pop(attr)

        if self.a_without_href:
            for link in doc.xpath('//a[not(@href)]'):
                link.drop_tag()

        if self.allow_classes is not None:
            for el in doc.xpath('//*[@class]'):
                classes = filter(None, el.attrib['class'].split())
                if el.tag in self.allow_classes:
                    allowed = self.allow_classes[el.tag]
                    condition = allowed if callable(allowed) else \
                            (lambda cls: cls in allowed)
                    classes = filter(condition, classes)
                else:
                    classes = []

                if classes:
                    el.attrib['class'] = ' '.join(classes)
                else:
                    el.attrib.pop('class')


        for callback in self.dom_callbacks:
            callback(doc)

        if self.wrap_inline_tags is not False and self.tags_to_wrap:
            self.clean_top(doc)

        if self.split_paragraphs_by_br:
            self.remove_brs_from_pars(doc)

        for tag in self.drop_empty_tags:
            for el in doc.xpath('//'+tag):
                if not el.attrib and self.is_element_empty(el):
                    el.drop_tree()
开发者ID:SmartTeleMax,项目名称:iktomi,代码行数:53,代码来源:html.py


示例14: assert_valid_temp_url

    def assert_valid_temp_url(self, name):
        url = self.backend.url(name)
        split_url = urlparse.urlsplit(url)
        query_params = urlparse.parse_qs(split_url[3])
        split_base_url = urlparse.urlsplit(base_url(container=self.backend.container_name, path=name))

        # ensure scheme, netloc, and path are same as to non-temporary URL
        self.assertEqual(split_base_url[0:2], split_url[0:2])

        # ensure query string contains signature and expiry
        self.assertIn('temp_url_sig', query_params)
        self.assertIn('temp_url_expires', query_params)
开发者ID:blacktorn,项目名称:django-storage-swift,代码行数:12,代码来源:tests.py


示例15: test_notify_alarm_with_batch_listener

 def test_notify_alarm_with_batch_listener(self, logger):
     data1 = {
         'actions': ['test://'],
         'alarm_id': 'foobar',
         'alarm_name': 'testalarm',
         'severity': 'critical',
         'previous': 'OK',
         'current': 'ALARM',
         'reason': 'Everything is on fire',
         'reason_data': {'fire': 'everywhere'}
     }
     data2 = {
         'actions': ['test://'],
         'alarm_id': 'foobar2',
         'alarm_name': 'testalarm2',
         'severity': 'low',
         'previous': 'ALARM',
         'current': 'OK',
         'reason': 'Everything is fine',
         'reason_data': {'fine': 'fine'}
     }
     self.service.stop()
     self.CONF.set_override("batch_size", 2, 'notifier')
     # Init a new service with new configuration
     self.svc = notifier.AlarmNotifierService(self.CONF)
     self.svc.start()
     self._msg_notifier.sample({}, 'alarm.update', data1)
     self._msg_notifier.sample({}, 'alarm.update', data2)
     time.sleep(1)
     notifications = self.svc.notifiers['test'].obj.notifications
     self.assertEqual(2, len(notifications))
     self.assertEqual((urlparse.urlsplit(data1['actions'][0]),
                       data1['alarm_id'],
                       data1['alarm_name'],
                       data1['severity'],
                       data1['previous'],
                       data1['current'],
                       data1['reason'],
                       data1['reason_data']),
                      notifications[0])
     self.assertEqual((urlparse.urlsplit(data2['actions'][0]),
                       data2['alarm_id'],
                       data2['alarm_name'],
                       data2['severity'],
                       data2['previous'],
                       data2['current'],
                       data2['reason'],
                       data2['reason_data']),
                      notifications[1])
     self.assertEqual(mock.call('Received %s messages in batch.', 2),
                      logger.call_args_list[0])
     self.svc.stop()
开发者ID:ISCAS-VDI,项目名称:aodh-base,代码行数:52,代码来源:test_notifier.py


示例16: UrlMustHaveSchemeAndPath

def UrlMustHaveSchemeAndPath( sURL ):
    #
    #from Utils.Both2n3 import urlsplit, urlunsplit
    #
    sScheme, sLocation, sPath, sQuery, sFragmentID = urlsplit( sURL )
    #
    if sScheme not in ( 'http', 'https', 'ftp' ): # if you got no scheme, urlsplit gets confused
        #
        sScheme, sLocation, sPath, sQuery, sFragmentID = urlsplit( 'http://' + sURL )
        #
    #
    if not sPath: sPath = '/'
    #
    return urlunsplit( ( sScheme, sLocation, sPath, sQuery, sFragmentID ) )
开发者ID:netvigator,项目名称:myPyPacks,代码行数:14,代码来源:Address.py


示例17: _test_new_user

        def _test_new_user():
            openid_plugin_app.post(
                '/auth/openid/login/', {
                    'openid': u'http://real.myopenid.com'})

            # Right place?
            assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
            register_form = context['register_form']

            # Register User
            res = openid_plugin_app.post(
                '/auth/openid/register/', {
                    'openid': register_form.openid.data,
                    'username': u'chris',
                    'email': u'[email protected]'})
            res.follow()

            # Correct place?
            assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
            assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT

            # No need to test if user is in logged in and verification email
            # awaits, since openid uses the register_user function which is
            # tested in test_auth

            # Logout User
            openid_plugin_app.get('/auth/logout')

            # Get user and detach from session
            test_user = mg_globals.database.LocalUser.query.filter(
                LocalUser.username==u'chris'
            ).first()
            Session.expunge(test_user)

            # Log back in
            # Could not get it to work by 'POST'ing to /auth/openid/login/
            template.clear_test_template_context()
            res = openid_plugin_app.post(
                '/auth/openid/login/finish/', {
                    'openid': u'http://real.myopenid.com'})
            res.follow()

            assert urlparse.urlsplit(res.location)[2] == '/'
            assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT

            # Make sure user is in the session
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
            session = context['request'].session
            assert session['user_id'] == six.text_type(test_user.id)
开发者ID:ausbin,项目名称:mediagoblin,代码行数:50,代码来源:test_openid.py


示例18: _get_version_match

    def _get_version_match(self, endpoint, profile_version, service_type):
        """Return the best matching version

        Look through each version trying to find the best match for
        the version specified in this profile.
         * The best match will only ever be found within the same
           major version, meaning a v2 profile will never match if
           only v3 is available on the server.
         * The search for the best match is fuzzy if needed.
           * If the profile specifies v2 and the server has
             v2.0, v2.1, and v2.2, the match will be v2.2.
           * When an exact major/minor is specified, e.g., v2.0,
             it will only match v2.0.
        """

        match_version = None

        for version in endpoint.versions:
            api_version = self._parse_version(version["id"])
            if profile_version.major != api_version.major:
                continue

            if profile_version.minor <= api_version.minor:
                for link in version["links"]:
                    if link["rel"] == "self":
                        resp_link = link['href']
                        match_version = parse.urlsplit(resp_link).path

            # Only break out of the loop on an exact match,
            # otherwise keep trying.
            if profile_version.minor == api_version.minor:
                break

        if match_version is None:
            raise exceptions.EndpointNotFound(
                "Unable to determine endpoint for %s" % service_type)

        # Make sure the root endpoint has no overlap with match_version
        root_parts = parse.urlsplit(endpoint.uri)
        match_version = match_version.replace(root_parts.path, "", 1)
        match = utils.urljoin(endpoint.uri, match_version)

        # For services that require the project id in the request URI,
        # add them in here.
        if endpoint.needs_project_id:
            match = utils.urljoin(match, endpoint.project_id)

        return match
开发者ID:briancurtin,项目名称:python-openstacksdk,代码行数:48,代码来源:session.py


示例19: remove_version_from_href

def remove_version_from_href(href):
    """Removes the first api version from the href.

    Given: 'http://www.nova.com/v1.1/123'
    Returns: 'http://www.nova.com/123'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.split('/', 2)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if expression.match(url_parts[1]):
        del url_parts[1]

    new_path = '/'.join(url_parts)

    if new_path == parsed_url.path:
        LOG.debug('href %s does not contain version' % href)
        raise ValueError(_('href %s does not contain version') % href)

    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
开发者ID:anjoah,项目名称:nova,代码行数:27,代码来源:common.py


示例20: save

    def save(self, *args, **kwargs):
        if self.image:
            try:
                tmp_filename, headers = urlretrieve(self.image)

                max_filename_length = self.image_cache.field.max_length  # Usually 100
                template_needs = len(self.image_cache_file_path_template % '')

                # We'll use the actual instance label size rather than the maximum that it could be
                # instance_label_length = self.instance._meta.get_field_by_name('label')[0].max_length
                instance_label_length = len(self.instance.label)
                truncate_to = max_filename_length - template_needs - instance_label_length - 8

                image_filename = os.path.basename(urlsplit(self.image).path)
                image_filename = url_to_unicode(image_filename)
                filename_root, extension = os.path.splitext(image_filename)

                truncated_filename = filename_root[:(truncate_to - len(extension))] + extension

                self.image_cache.save(
                    truncated_filename,
                    File(open(tmp_filename, 'rb')),

                    # Calling save on a FieldFile causes a save on the model instance
                    # we don't need that as we're about to do it below (without this
                    # we get an infinite recursion).
                    save=False,
                    )
            except HTTPError:
                pass

        return super(Speaker, self).save(*args, **kwargs)
开发者ID:johnfelipe,项目名称:sayit,代码行数:32,代码来源:models.py



注:本文中的six.moves.urllib.parse.urlsplit函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.urlunparse函数代码示例发布时间:2022-05-27
下一篇:
Python parse.urlquote函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap