• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python parse.parse_qs函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中six.moves.urllib.parse.parse_qs函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qs函数的具体用法?Python parse_qs怎么用?Python parse_qs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_qs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_add_extras

    def test_add_extras(self):
        context = {}
        settings.ANALYTICS = {
            'GOOGLE': {
                'GTM_SITE_ID': 'gtm-site-id',
                'GA_SITE_ID': 'ga-site-id',
                },
            'DAP': {
                'AGENCY': 'agency',
                'SUBAGENCY': 'sub-agency',
            }
        }

        utils.add_extras(context)

        self.assertTrue('APP_PREFIX' in context)
        self.assertTrue('env' in context)

        self.assertEquals('gtm-site-id',
                          context['ANALYTICS']['GOOGLE']['GTM_SITE_ID'])
        self.assertEquals('ga-site-id',
                          context['ANALYTICS']['GOOGLE']['GA_SITE_ID'])
        self.assertEquals('agency', context['ANALYTICS']['DAP']['AGENCY'])
        self.assertEquals('sub-agency',
                          context['ANALYTICS']['DAP']['SUBAGENCY'])
        self.assertEquals(
            parse_qs('agency=agency&subagency=sub-agency'),
            parse_qs(context['ANALYTICS']['DAP']['DAP_URL_PARAMS']),
        )
开发者ID:adborden,项目名称:regulations-site,代码行数:29,代码来源:views_utils_test.py


示例2: test_list_buckets_non_empty

    def test_list_buckets_non_empty(self):
        from six.moves.urllib.parse import parse_qs
        from six.moves.urllib.parse import urlencode
        from six.moves.urllib.parse import urlparse
        PROJECT = 'PROJECT'
        CREDENTIALS = _Credentials()
        client = self._makeOne(project=PROJECT, credentials=CREDENTIALS)

        BUCKET_NAME = 'bucket-name'
        query_params = urlencode({'project': PROJECT, 'projection': 'noAcl'})
        BASE_URI = '/'.join([
            client.connection.API_BASE_URL,
            'storage',
            client.connection.API_VERSION,
        ])
        URI = '/'.join([BASE_URI, 'b?%s' % (query_params,)])
        http = client.connection._http = _Http(
            {'status': '200', 'content-type': 'application/json'},
            '{{"items": [{{"name": "{0}"}}]}}'.format(BUCKET_NAME)
            .encode('utf-8'),
        )
        buckets = list(client.list_buckets())
        self.assertEqual(len(buckets), 1)
        self.assertEqual(buckets[0].name, BUCKET_NAME)
        self.assertEqual(http._called_with['method'], 'GET')
        self.assertTrue(http._called_with['uri'].startswith(BASE_URI))
        self.assertEqual(parse_qs(urlparse(http._called_with['uri']).query),
                         parse_qs(urlparse(URI).query))
开发者ID:jonparrott,项目名称:gcloud-python,代码行数:28,代码来源:test_client.py


示例3: _list_buckets_non_empty_helper

    def _list_buckets_non_empty_helper(self, project, use_default=False):
        from six.moves.urllib.parse import parse_qs
        from six.moves.urllib.parse import urlencode
        from six.moves.urllib.parse import urlparse
        from gcloud._testing import _monkey_defaults as _base_monkey_defaults
        from gcloud.storage._testing import _monkey_defaults
        from gcloud.storage.connection import Connection

        BUCKET_NAME = "bucket-name"
        conn = Connection()
        query_params = urlencode({"project": project, "projection": "noAcl"})
        BASE_URI = "/".join([conn.API_BASE_URL, "storage", conn.API_VERSION])
        URI = "/".join([BASE_URI, "b?%s" % (query_params,)])
        http = conn._http = Http(
            {"status": "200", "content-type": "application/json"},
            '{{"items": [{{"name": "{0}"}}]}}'.format(BUCKET_NAME).encode("utf-8"),
        )

        if use_default:
            with _base_monkey_defaults(project=project):
                with _monkey_defaults(connection=conn):
                    buckets = list(self._callFUT())
        else:
            buckets = list(self._callFUT(project=project, connection=conn))

        self.assertEqual(len(buckets), 1)
        self.assertEqual(buckets[0].name, BUCKET_NAME)
        self.assertEqual(http._called_with["method"], "GET")
        self.assertTrue(http._called_with["uri"].startswith(BASE_URI))
        self.assertEqual(parse_qs(urlparse(http._called_with["uri"]).query), parse_qs(urlparse(URI).query))
开发者ID:nlokare,项目名称:gcloud-python,代码行数:30,代码来源:test_api.py


示例4: test_authorize_view

    def test_authorize_view(self):
        with self.app.test_client() as c:
            rv = c.get('/oauth2authorize')
            location = rv.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertTrue(GOOGLE_AUTH_URI in location)
            self.assertFalse(self.oauth2.client_secret in location)
            self.assertTrue(self.oauth2.client_id in q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as c:
            rv = c.get('/oauth2authorize?return_url=/test')
            location = rv.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as c:
            rv = c.get('/oauth2authorize?extra_param=test')
            location = rv.headers['Location']
            self.assertTrue('extra_param=test' in location)
开发者ID:JasOXIII,项目名称:oauth2client,代码行数:25,代码来源:test_flask_util.py


示例5: assertRedirectsNoFollow

    def assertRedirectsNoFollow(self, response, expected_url, use_params=True,
                                status_code=302):
        """Checks response redirect without loading the destination page.

        Django's assertRedirects method loads the destination page, which
        requires that the page be renderable in the current test context
        (possibly requiring additional, unrelated setup).
        """
        # Assert that the response has the correct redirect code.
        self.assertEqual(
            response.status_code, status_code,
            "Response didn't redirect as expected: Response code was {0} "
            "(expected {1})".format(response.status_code, status_code))

        # Assert that the response redirects to the correct base URL.
        # Use force_text to force evaluation of anything created by
        # reverse_lazy.
        response_url = force_text(response['location'])
        expected_url = force_text(expected_url)
        parsed1 = urlparse(response_url)
        parsed2 = urlparse(expected_url)
        self.assertEquals(
            parsed1.path, parsed2.path,
            "Response did not redirect to the expected URL: Redirect "
            "location was {0} (expected {1})".format(parsed1.path, parsed2.path))

        # Optionally assert that the response redirect URL has the correct
        # GET parameters.
        if use_params:
            self.assertDictEqual(
                parse_qs(parsed1.query), parse_qs(parsed2.query),
                "Response did not have the GET parameters expected: GET "
                "parameters were {0} (expected "
                "{1})".format(parsed1.query or {}, parsed2.query or {}))
开发者ID:Cashiuus,项目名称:django-timepiece,代码行数:34,代码来源:base.py


示例6: _unicode_parse_qs

def _unicode_parse_qs(qs, **kwargs):
    """
    A wrapper around ``urlparse.parse_qs`` that converts unicode strings to
    UTF-8 to prevent ``urlparse.unquote`` from performing it's default decoding
    to latin-1 see http://hg.python.org/cpython/file/2.7/Lib/urlparse.py

    :param qs:       Percent-encoded query string to be parsed.
    :type qs:        ``str``

    :param kwargs:   Other keyword args passed onto ``parse_qs``.
    """
    if PY3 or isinstance(qs, bytes):
        # Nothing to do
        return parse_qs(qs, **kwargs)

    qs = qs.encode('utf-8', 'ignore')
    query = parse_qs(qs, **kwargs)
    unicode_query = {}
    for key in query:
        uni_key = key.decode('utf-8', 'ignore')
        if uni_key == '':
            # because we ignore decode errors and only support utf-8 right now,
            # we could end up with a blank string which we ignore
            continue
        unicode_query[uni_key] = [p.decode('utf-8', 'ignore') for p in query[key]]
    return unicode_query
开发者ID:digitalstrategy,项目名称:serpextract,代码行数:26,代码来源:serpextract.py


示例7: test_collection_ok_by_state

    def test_collection_ok_by_state(
            self, f_users, f_coprs,
            f_mock_chroots_many,
            f_build_many_chroots,
            f_db,
            f_users_api):

        self.db.session.commit()
        for status in StatusEnum.vals.values():
            expected_chroots = set([
                name
                for name, chroot_status in
                self.status_by_chroot.items()
                if chroot_status == status
            ])

            href = "/api_2/build_tasks?state={}&limit=50".format(StatusEnum(status))

            r0 = self.tc.get(href)
            assert r0.status_code == 200
            obj = json.loads(r0.data.decode("utf-8"))
            assert len(obj["build_tasks"]) == len(expected_chroots)
            assert set(bt["build_task"]["chroot_name"]
                       for bt in obj["build_tasks"]) == expected_chroots

            assert parse_qs(urlparse(obj["_links"]["self"]["href"]).query) \
                == parse_qs(urlparse(href).query)
开发者ID:danvratil,项目名称:copr,代码行数:27,代码来源:test_build_task_r.py


示例8: test_signature_values

    def test_signature_values(self):
        """Test signature generation and update"""

        body = """
        {"response": {
            "user_info": {}, "new_key": "yes", "result": "Success"}
        }
        """

        responses.add(responses.POST, self.url, body=body, status=200,
                      content_type="application/json")
        # original secret key
        self.api.user_get_info()
        # secret key is (1000000000 * 16807) % 2147483647 = 792978578
        self.api.user_get_info()

        query = responses.calls[0].request.body
        params = parse_qs(query)

        self.assertEqual(params['signature'][0], CALL_SIGNATURES[0])

        query = responses.calls[1].request.body
        params = parse_qs(query)

        self.assertEqual(params['signature'][0], CALL_SIGNATURES[1])
开发者ID:aaejohani9,项目名称:mediafire-python-open-sdk,代码行数:25,代码来源:test_signature.py


示例9: test_authorize_view

    def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
开发者ID:MaloneQQ,项目名称:oauth2client,代码行数:25,代码来源:test_flask_util.py


示例10: _match_url

    def _match_url(self, request):
        if self._url is ANY:
            return True

        # regular expression matching
        if hasattr(self._url, 'search'):
            return self._url.search(request.url) is not None

        if self._url_parts.scheme and request.scheme != self._url_parts.scheme:
            return False

        if self._url_parts.netloc and request.netloc != self._url_parts.netloc:
            return False

        if (request.path or '/') != (self._url_parts.path or '/'):
            return False

        # construct our own qs structure as we remove items from it below
        request_qs = urlparse.parse_qs(request.query)
        matcher_qs = urlparse.parse_qs(self._url_parts.query)

        for k, vals in six.iteritems(matcher_qs):
            for v in vals:
                try:
                    request_qs.get(k, []).remove(v)
                except ValueError:
                    return False

        if self._complete_qs:
            for v in six.itervalues(request_qs):
                if v:
                    return False

        return True
开发者ID:humitos,项目名称:requests-mock,代码行数:34,代码来源:adapter.py


示例11: new_websocket_client

    def new_websocket_client(self):
        """Called after a new WebSocket connection has been established."""
        # Reopen the eventlet hub to make sure we don't share an epoll
        # fd with parent and/or siblings, which would be bad
        from eventlet import hubs
        hubs.use_hub()

        # The zun expected behavior is to have token
        # passed to the method GET of the request
        parse = urlparse.urlparse(self.path)
        if parse.scheme not in ('http', 'https'):
            # From a bug in urlparse in Python < 2.7.4 we cannot support
            # special schemes (cf: https://bugs.python.org/issue9374)
            if sys.version_info < (2, 7, 4):
                raise exception.ZunException(
                    _("We do not support scheme '%s' under Python < 2.7.4, "
                      "please use http or https") % parse.scheme)

        query = parse.query
        token = urlparse.parse_qs(query).get("token", [""]).pop()
        uuid = urlparse.parse_qs(query).get("uuid", [""]).pop()
        exec_id = urlparse.parse_qs(query).get("exec_id", [""]).pop()

        ctx = context.get_admin_context(all_projects=True)

        if uuidutils.is_uuid_like(uuid):
            container = objects.Container.get_by_uuid(ctx, uuid)
        else:
            container = objects.Container.get_by_name(ctx, uuid)

        if exec_id:
            self._new_exec_client(container, token, uuid, exec_id)
        else:
            self._new_websocket_client(container, token, uuid)
开发者ID:openstack,项目名称:higgins,代码行数:34,代码来源:websocketproxy.py


示例12: get_msg

def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_HTTP_ARTIFACT:
        # either by POST or by redirect
        if hinfo["data"]:
            _inp = hinfo["data"][3]
            i = _inp.find(TAG1)
            i += len(TAG1) + 1
            j = _inp.find('"', i)
            msg = _inp[i:j]
        else:
            parts = urlparse(hinfo["url"])
            msg = parse_qs(parts.query)["SAMLart"][0]
    else: # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg
开发者ID:Amli,项目名称:pysaml2,代码行数:25,代码来源:test_64_artifact.py


示例13: discharge

 def discharge(url, request):
     qs = parse_qs(request.body)
     if qs.get('token64') is None:
         return response(
             status_code=401,
             content={
                 'Code': httpbakery.ERR_INTERACTION_REQUIRED,
                 'Message': 'interaction required',
                 'Info': {
                     'InteractionMethods': {
                         'agent': {'login-url': '/login'},
                     },
                 },
             },
             headers={'Content-Type': 'application/json'})
     else:
         qs = parse_qs(request.body)
         content = {q: qs[q][0] for q in qs}
         m = httpbakery.discharge(checkers.AuthContext(), content,
                                  discharge_key, None, alwaysOK3rd)
         return {
             'status_code': 200,
             'content': {
                 'Macaroon': m.to_dict()
             }
         }
开发者ID:go-macaroon-bakery,项目名称:py-macaroon-bakery,代码行数:26,代码来源:test_agent.py


示例14: _list_buckets_non_empty_helper

    def _list_buckets_non_empty_helper(self, project, use_default=False):
        from six.moves.urllib.parse import parse_qs
        from six.moves.urllib.parse import urlencode
        from six.moves.urllib.parse import urlparse
        from gcloud._testing import _monkey_defaults as _base_monkey_defaults
        from gcloud.storage._testing import _monkey_defaults
        from gcloud.storage.connection import Connection
        BUCKET_NAME = 'bucket-name'
        conn = Connection()
        query_params = urlencode({'project': project, 'projection': 'noAcl'})
        BASE_URI = '/'.join([
            conn.API_BASE_URL,
            'storage',
            conn.API_VERSION,
        ])
        URI = '/'.join([BASE_URI, 'b?%s' % (query_params,)])
        http = conn._http = Http(
            {'status': '200', 'content-type': 'application/json'},
            '{{"items": [{{"name": "{0}"}}]}}'.format(BUCKET_NAME)
            .encode('utf-8'),
        )

        if use_default:
            with _base_monkey_defaults(project=project):
                with _monkey_defaults(connection=conn):
                    buckets = list(self._callFUT())
        else:
            buckets = list(self._callFUT(project=project, connection=conn))

        self.assertEqual(len(buckets), 1)
        self.assertEqual(buckets[0].name, BUCKET_NAME)
        self.assertEqual(http._called_with['method'], 'GET')
        self.assertTrue(http._called_with['uri'].startswith(BASE_URI))
        self.assertEqual(parse_qs(urlparse(http._called_with['uri']).query),
                         parse_qs(urlparse(URI).query))
开发者ID:SimKennedy,项目名称:gcloud-python,代码行数:35,代码来源:test_api.py


示例15: test_build_ga_params_for_campaign_tracking_params

    def test_build_ga_params_for_campaign_tracking_params(self):
        '''
        Test that the  correct GA campaign
        tracking params are tracked correctly
        '''
        request = self.make_fake_request(
            '/somewhere/?utm_campaign=campaign name&utm_term=campaign keyword')
        ga_dict_with_campaign_params = build_ga_params(
            request, 'ua-test-id', '/compaign/path/')
        self.assertEqual(
            parse_qs(ga_dict_with_campaign_params.get(
                'utm_url')).get('cn'), ['campaign name'])
        self.assertEqual(
            parse_qs(ga_dict_with_campaign_params.get(
                'utm_url')).get('ck'), ['campaign keyword'])

        # params that aren't in the request should be excluded from the utm_url
        self.assertEqual(
            parse_qs(
                ga_dict_with_campaign_params.get(
                    'utm_url')).get('cs'), None)
        self.assertEqual(
            parse_qs(
                ga_dict_with_campaign_params.get(
                    'utm_url')).get('cm'), None)
开发者ID:praekelt,项目名称:django-google-analytics,代码行数:25,代码来源:test_ga.py


示例16: test_build_ga_params_for_direct_referals

    def test_build_ga_params_for_direct_referals(self):
        headers = {'HTTP_HOST': 'localhost:8000'}
        request = self.make_fake_request('/somewhere/', headers)
        ga_dict_without_referal = build_ga_params(
            request, 'ua-test-id', '/some/path/',)
        ga_dict_without_direct_referal = build_ga_params(
            request, 'ua-test-id', '/some/path/',
            referer='http://test.com/some/path/')

        ga_dict_with_direct_referal = build_ga_params(
            request, 'ua-test-id', '/some/path/',
            referer='http://localhost:8000/some/path/')

        # None: if referal is not set
        self.assertEqual(
            parse_qs(ga_dict_without_referal.get('utm_url')).get('dr'), None)
        # Include referals from another host
        self.assertEqual(
            parse_qs(
                ga_dict_without_direct_referal.get('utm_url')).get('dr'),
            ['http://test.com/some/path/'])
        # Exlcude referals from the same host
        self.assertEqual(
            parse_qs(
                ga_dict_with_direct_referal.get('utm_url')).get('dr'),
            ['/some/path/'])
开发者ID:praekelt,项目名称:django-google-analytics,代码行数:26,代码来源:test_ga.py


示例17: test_publish_to_http

def test_publish_to_http():
    httpretty.HTTPretty.register_uri(
        method="POST",
        uri="http://example.com/foobar",
    )

    conn = boto.connect_sns()
    conn.create_topic("some-topic")
    topics_json = conn.get_all_topics()
    topic_arn = topics_json["ListTopicsResponse"]["ListTopicsResult"]["Topics"][0]['TopicArn']

    conn.subscribe(topic_arn, "http", "http://example.com/foobar")

    response = conn.publish(topic=topic_arn, message="my message", subject="my subject")
    message_id = response['PublishResponse']['PublishResult']['MessageId']

    last_request = httpretty.last_request()
    last_request.method.should.equal("POST")
    parse_qs(last_request.body.decode('utf-8')).should.equal({
        "Type": ["Notification"],
        "MessageId": [message_id],
        "TopicArn": ["arn:aws:sns:us-east-1:123456789012:some-topic"],
        "Subject": ["my subject"],
        "Message": ["my message"],
        "Timestamp": ["2013-01-01T00:00:00Z"],
        "SignatureVersion": ["1"],
        "Signature": ["EXAMPLElDMXvB8r9R83tGoNn0ecwd5UjllzsvSvbItzfaMpN2nk5HVSw7XnOn/49IkxDKz8YrlH2qJXj2iZB0Zo2O71c4qQk1fMUDi3LGpij7RCW7AW9vYYsSqIKRnFS94ilu7NFhUzLiieYr4BKHpdTmdD6c0esKEYBpabxDSc="],
        "SigningCertURL": ["https://sns.us-east-1.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem"],
        "UnsubscribeURL": ["https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:some-topic:2bcfbf39-05c3-41de-beaa-fcfcc21c8f55"],
    })
开发者ID:mob-akiyama,项目名称:moto,代码行数:30,代码来源:test_publishing.py


示例18: assert_setup_flow

    def assert_setup_flow(self, team_id='TXXXXXXX1', authorizing_user_id='UXXXXXXX1'):
        responses.reset()

        resp = self.client.get(self.init_path)
        assert resp.status_code == 302
        redirect = urlparse(resp['Location'])
        assert redirect.scheme == 'https'
        assert redirect.netloc == 'slack.com'
        assert redirect.path == '/oauth/authorize'
        params = parse_qs(redirect.query)
        assert params['scope'] == [' '.join(self.provider.identity_oauth_scopes)]
        assert params['state']
        assert params['redirect_uri'] == ['http://testserver/extensions/slack/setup/']
        assert params['response_type'] == ['code']
        assert params['client_id'] == ['slack-client-id']
        # once we've asserted on it, switch to a singular values to make life
        # easier
        authorize_params = {k: v[0] for k, v in six.iteritems(params)}

        responses.add(
            responses.POST, 'https://slack.com/api/oauth.token',
            json={
                'ok': True,
                'access_token': 'xoxp-xxxxxxxxx-xxxxxxxxxx-xxxxxxxxxxxx',
                'team_id': team_id,
                'team_name': 'Example',
                'authorizing_user_id': authorizing_user_id,
            }
        )

        responses.add(
            responses.GET, 'https://slack.com/api/team.info',
            json={
                'ok': True,
                'team': {
                    'domain': 'test-slack-workspace',
                    'icon': {'image_132': 'http://example.com/ws_icon.jpg'},
                },
            }
        )

        resp = self.client.get(u'{}?{}'.format(
            self.setup_path,
            urlencode({
                'code': 'oauth-code',
                'state': authorize_params['state'],
            })
        ))

        mock_request = responses.calls[0].request
        req_params = parse_qs(mock_request.body)
        assert req_params['grant_type'] == ['authorization_code']
        assert req_params['code'] == ['oauth-code']
        assert req_params['redirect_uri'] == ['http://testserver/extensions/slack/setup/']
        assert req_params['client_id'] == ['slack-client-id']
        assert req_params['client_secret'] == ['slack-client-secret']

        assert resp.status_code == 200
        self.assertDialogSuccess(resp)
开发者ID:yaoqi,项目名称:sentry,代码行数:59,代码来源:test_integration.py


示例19: setup_class

    def setup_class(self, request, full_url, headers):
        querystring = {}
        if hasattr(request, 'body'):
            # Boto
            self.body = request.body
        else:
            # Flask server

            # FIXME: At least in Flask==0.10.1, request.data is an empty string
            # and the information we want is in request.form. Keeping self.body
            # definition for back-compatibility
            self.body = request.data

            querystring = {}
            for key, value in request.form.items():
                querystring[key] = [value, ]

        raw_body = self.body
        if isinstance(self.body, six.binary_type):
            self.body = self.body.decode('utf-8')

        if not querystring:
            querystring.update(
                parse_qs(urlparse(full_url).query, keep_blank_values=True))
        if not querystring:
            if 'json' in request.headers.get('content-type', []) and self.aws_service_spec:
                decoded = json.loads(self.body)

                target = request.headers.get(
                    'x-amz-target') or request.headers.get('X-Amz-Target')
                service, method = target.split('.')
                input_spec = self.aws_service_spec.input_spec(method)
                flat = flatten_json_request_body('', decoded, input_spec)
                for key, value in flat.items():
                    querystring[key] = [value]
            elif self.body:
                try:
                    querystring.update(parse_qs(raw_body, keep_blank_values=True))
                except UnicodeEncodeError:
                    pass  # ignore encoding errors, as the body may not contain a legitimate querystring
        if not querystring:
            querystring.update(headers)

        try:
            querystring = _decode_dict(querystring)
        except UnicodeDecodeError:
            pass  # ignore decoding errors, as the body may not contain a legitimate querystring

        self.uri = full_url
        self.path = urlparse(full_url).path
        self.querystring = querystring
        self.method = request.method
        self.region = self.get_region_from_url(request, full_url)
        self.uri_match = None

        self.headers = request.headers
        if 'host' not in self.headers:
            self.headers['host'] = urlparse(full_url).netloc
        self.response_headers = {"server": "amazon.com"}
开发者ID:spulec,项目名称:moto,代码行数:59,代码来源:responses.py


示例20: equals

    def equals(self, rhs):
        lhsp = urlparse.urlparse(self.lhs)
        rhsp = urlparse.urlparse(rhs)

        return (lhsp.scheme == rhsp.scheme and
                lhsp.netloc == rhsp.netloc and
                lhsp.path == rhsp.path and
                urlparse.parse_qs(lhsp.query) == urlparse.parse_qs(rhsp.query))
开发者ID:gocloudxyz,项目名称:rackspace-python-neutronclient,代码行数:8,代码来源:test_cli20.py



注:本文中的six.moves.urllib.parse.parse_qs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python parse.parse_qsl函数代码示例发布时间:2022-05-27
下一篇:
Python tkinter_messagebox.showerror函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap