• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python parse.parse_qsl函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib.parse.parse_qsl函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qsl函数的具体用法?Python parse_qsl怎么用?Python parse_qsl使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_qsl函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: add_preserved_filters

def add_preserved_filters(context, url, popup=False):
    opts = context.get('opts')
    preserved_filters = context.get('preserved_filters')

    parsed_url = list(urlparse(url))
    parsed_qs = dict(parse_qsl(parsed_url[4]))
    merged_qs = dict()

    if opts and preserved_filters:
        preserved_filters = dict(parse_qsl(preserved_filters))

        try:
            match = resolve(url)
        except Resolver404:
            pass
        else:
            current_url = '%s:%s' % (match.namespace, match.url_name)
            changelist_url = 'admin:%s_%s_changelist' % (opts.app_label, opts.model_name)
            if changelist_url == current_url and '_changelist_filters' in preserved_filters:
                preserved_filters = dict(parse_qsl(preserved_filters['_changelist_filters']))

        merged_qs.update(preserved_filters)

    if popup:
        from django.contrib.admin.options import IS_POPUP_VAR
        merged_qs[IS_POPUP_VAR] = 1

    merged_qs.update(parsed_qs)

    parsed_url[4] = urlencode(merged_qs)
    return urlunparse(parsed_url)
开发者ID:1ngr1d,项目名称:django,代码行数:31,代码来源:admin_urls.py


示例2: twitter

def twitter():
    request_token_url = 'https://api.twitter.com/oauth/request_token'
    access_token_url = 'https://api.twitter.com/oauth/access_token'
    authenticate_url = 'https://api.twitter.com/oauth/authenticate'

    if request.args.get('oauth_token') and request.args.get('oauth_verifier'):
        auth = OAuth1(app.config['TWITTER_CONSUMER_KEY'],
                      client_secret=app.config['TWITTER_CONSUMER_SECRET'],
                      resource_owner_key=request.args.get('oauth_token'),
                      verifier=request.args.get('oauth_verifier'))
        r = requests.post(access_token_url, auth=auth)
        profile = dict(parse_qsl(r.text))

        user = User.query.filter_by(twitter=profile['user_id']).first()
        if user:
            token = create_jwt_token(user)
            return jsonify(token=token)
        u = User(twitter=profile['user_id'],
                 first_name=profile['screen_name'])
        db.session.add(u)
        db.session.commit()
        token = create_jwt_token(u)
        return jsonify(token=token)
    else:
        oauth = OAuth1(app.config['TWITTER_CONSUMER_KEY'],
                       client_secret=app.config['TWITTER_CONSUMER_SECRET'],
                       callback_uri=app.config['TWITTER_CALLBACK_URL'])
        r = requests.post(request_token_url, auth=oauth)
        oauth_token = dict(parse_qsl(r.text))
        qs = urlencode(dict(oauth_token=oauth_token['oauth_token']))
        return redirect(authenticate_url + '?' + qs)
开发者ID:Amit-Kolambikar,项目名称:satellizer,代码行数:31,代码来源:app.py


示例3: _build_url

    def _build_url(self):
        """Build resource url

        Parsing ``self._url``, add ``self._params`` to query string if need

        :return self._url: resource url
        """
        scheme, netloc, path, params, query, fragment = urlparse(self._url)

        # IDN domains support
        netloc = to_unicode(netloc)
        # removed idna encode as it was causing python3 urlunparse to error
        # print(repr(netloc), repr(netloc.encode('idna')))

        if not netloc:
            raise ValueError("Invalid url")
        elif not scheme:
            scheme = "http"

        tmp = []
        if self._params is not None:
            for param, value in self._params:
                if isinstance(value, tuple):
                    for i in value:
                        tmp.append((param, i))
                elif isinstance(value, str):
                    tmp.append((param, value))

        if tmp:
            tmp = parse_qsl(query, keep_blank_values=True) + tmp
        else:
            try:
                tmp = parse_qsl(query, keep_blank_values=True, strict_parsing=True)
            except ValueError:
                tmp = query

        if isinstance(tmp, str):
            encode = quote_plus
            noencode = lambda result: result
        else:
            encode = urlencode
            noencode = urlnoencode

        if self._encode_query:
            query = encode(tmp)
        else:
            query = noencode(tmp)

        del tmp
        # print(repr([scheme, netloc, path, query, fragment]))
        url_unparse_list = [
            scheme.encode('utf8'),
            netloc.encode('idna'),
            path.encode('utf8'),
            params.encode('utf8'),
            query.encode('utf8'),
            fragment.encode('utf8')]

        self._url = urlunparse(url_unparse_list)
        return self._url
开发者ID:budlight,项目名称:human_curl,代码行数:60,代码来源:core.py


示例4: append_qs

def append_qs(url, query_string):
    """Append query_string values to an existing URL and return it as a string.

    query_string can be:
        * an encoded string: 'test3=val1&test3=val2'
        * a dict of strings: {'test3': 'val'}
        * a dict of lists of strings: {'test3': ['val1', 'val2']}
        * a list of tuples: [('test3', 'val1'), ('test3', 'val2')]

    """
    parsed_url = urlparse.urlsplit(url)
    parsed_qs = urlparse.parse_qsl(parsed_url.query, True)

    if isstr(query_string):
        parsed_qs += urlparse.parse_qsl(query_string)
    elif isdict(query_string):
        for item in query_string.items():
            if islist(item[1]):
                for val in item[1]:
                    parsed_qs.append((item[0], val))
            else:
                parsed_qs.append(item)
    elif islist(query_string):
        parsed_qs += query_string
    else:
        raise TypeError('Unexpected query_string type')

    return urlparse.urlunsplit((
        parsed_url.scheme,
        parsed_url.netloc,
        parsed_url.path,
        urlencode_unicode(parsed_qs),
        parsed_url.fragment,
    ))
开发者ID:JoProvost,项目名称:python-ubersmith,代码行数:34,代码来源:utils.py


示例5: build_url

def build_url(base, query_params={}, fragment={}):
    """Construct a URL based off of base containing all parameters in
    the query portion of base plus any additional parameters.
    Taken from https://github.com/NateFerrero/oauth2lib/blob/master/oauth2lib/utils.py and extended to allow
    paramenters as fragment
    :param base: Base URL
    :type base: str
    ::param query_params: Additional query parameters to include.
    :type query_params: dict
    ::param fragment: Additional parameters to include in the fragment section of the url
    :type fragment: dict
    :rtype: str
    """
    url = urlparse.urlparse(base)
    query_params.update(urlparse.parse_qsl(url.query, True))
    query_params = {k: v for k, v in query_params.iteritems() if v is not None}

    fragment.update(urlparse.parse_qsl(url.fragment, True))
    fragment = {k: v for k, v in fragment.iteritems() if v is not None}

    return urlparse.urlunparse((url.scheme,
                                url.netloc,
                                url.path,
                                url.params,
                                urllib.urlencode(query_params),
                                urllib.urlencode(fragment)))
开发者ID:shell-labs,项目名称:flask-rest-boilerplate,代码行数:26,代码来源:util.py


示例6: add_preserved_filters

def add_preserved_filters(context, url, popup=False, to_field=None):
    opts = context.get('opts')
    preserved_filters = context.get('preserved_filters')

    parsed_url = list(urlparse(url))
    parsed_qs = dict(parse_qsl(parsed_url[4]))
    merged_qs = {}

    if opts and preserved_filters:
        preserved_filters = dict(parse_qsl(preserved_filters))

        match_url = '/%s' % url.partition(get_script_prefix())[2]
        try:
            match = resolve(match_url)
        except Resolver404:
            pass
        else:
            current_url = '%s:%s' % (match.app_name, match.url_name)
            changelist_url = 'admin:%s_%s_changelist' % (opts.app_label, opts.model_name)
            if changelist_url == current_url and '_changelist_filters' in preserved_filters:
                preserved_filters = dict(parse_qsl(preserved_filters['_changelist_filters']))

        merged_qs.update(preserved_filters)

    if popup:
        from django.contrib.admin.options import IS_POPUP_VAR
        merged_qs[IS_POPUP_VAR] = 1
    if to_field:
        from django.contrib.admin.options import TO_FIELD_VAR
        merged_qs[TO_FIELD_VAR] = to_field

    merged_qs.update(parsed_qs)

    parsed_url[4] = urlencode(merged_qs)
    return urlunparse(parsed_url)
开发者ID:100448facens,项目名称:django-example-jn,代码行数:35,代码来源:admin_urls.py


示例7: respond_all

	def respond_all(environ, start_response):
	    status = '200 OK' # HTTP Status
	    headers = [('Content-type', 'text/plain; charset=utf-8')] # HTTP Headers
	    start_response(status, headers)

	    my_dict = {'method_name': environ['REQUEST_METHOD'], 'path_info': environ['PATH_INFO']}

	    if len(environ['CONTENT_LENGTH'])>0:
	    	my_dict['content_length'] = environ['CONTENT_LENGTH']
	    	request_body = environ['wsgi.input'].read(int(environ['CONTENT_LENGTH'])).decode('utf-8')
	    	request_body = request_body.strip('[] ')
	    	body_dict = parse.parse_qsl(request_body)
	    	my_dict.update(body_dict)
	    else:
	    	pass

	    if len(environ['QUERY_STRING'])>0:
	    	qs = environ['QUERY_STRING']
	    	query_dict = parse.parse_qsl(qs)
	    	my_dict.update(query_dict)
	    else:
	    	pass

	    json_response = json.dumps(my_dict, sort_keys=True, indent=4, separators=(',',': '))
	    
	    return [json_response.encode('utf-8')]
开发者ID:jay-batavia,项目名称:1.CP341-Web_Services,代码行数:26,代码来源:http_parser.py


示例8: cleanup

def cleanup(url):
    url = _follow(url)
    # remove trackers params
    try:
        urlp = urlparse(url)
        # cleanup query param
        query = parse_qsl(urlp.query)
        # only if query is non empty and we manage to parse fragment as
        # key/value
        if urlp.query and query:
            for annoying in ANNOYING_PARAMS:
                query = [(x, y) for x, y in query if not x.startswith(annoying)]
            urlp = urlp._replace(
                query=urlencode(query),
            )

        # cleanup fragment param
        fragment = parse_qsl(urlp.fragment)
        # only if fragments is non empty and we manage to parse fragment as
        # key/value
        if urlp.fragment and fragment:
            for annoying in ANNOYING_PARAMS:
                fragment = [(x, y) for x, y in fragment if not x.startswith(annoying)]
            urlp = urlp._replace(
                fragment=urlencode(fragment),
            )
        url = urlp.geturl()
    except Exception:
        app.logger.exception("Problem cleaning url %s", url)

    app.logger.info("Final url %s", url)
    return url
开发者ID:jcsaaddupuy,项目名称:dereferer,代码行数:32,代码来源:app.py


示例9: twitter

def twitter():
    request_token_url = 'https://api.twitter.com/oauth/request_token'
    access_token_url = 'https://api.twitter.com/oauth/access_token'

    if request.json.get('oauth_token') and request.json.get('oauth_verifier'):
        auth = OAuth1(app.config['OAUTH2_CLIENT_ID'],
                      client_secret=app.config['OAUTH2_CLIENT_SECRET'],
                      resource_owner_key=request.json.get('oauth_token'),
                      verifier=request.json.get('oauth_verifier'))
        r = requests.post(access_token_url, auth=auth)
        profile = dict(parse_qsl(r.text))

        login = profile['screen_name']
        if app.config['AUTH_REQUIRED'] and not db.is_user_valid(login=login):
            return jsonify(status="error", message="User %s is not authorized" % login), 403

        token = create_token(profile['user_id'], '@'+login, login, provider='twitter')
        return jsonify(token=token)
    else:
        oauth = OAuth1(app.config['OAUTH2_CLIENT_ID'],
                       client_secret=app.config['OAUTH2_CLIENT_SECRET'],
                       callback_uri=app.config.get('TWITTER_CALLBACK_URL', request.headers.get('Referer', ''))
        )
        r = requests.post(request_token_url, auth=oauth)
        oauth_token = dict(parse_qsl(r.text))
        return jsonify(oauth_token)
开发者ID:iselu,项目名称:alerta,代码行数:26,代码来源:auth.py


示例10: get_redirect_to

    def get_redirect_to(self):
        assert self.is_redirect()

        if hasattr(self.request, 'response_mode'):
            response_mode = self.request.response_mode
        else:
            response_mode = self.request.get('response_mode')

        if response_mode:
            is_fragment = response_mode == 'fragment'
        else:
            response_types = set(self.request.response_type.split())
            is_fragment = 'token' in response_types

        parsed = urlparse(self.redirect_uri)
        if is_fragment:
            query = parsed.query
            fragment = self.to_query_string()
        else:
            query = parse_qsl(parsed.query)
            query.extend(parse_qsl(self.to_query_string()))
            query = urlencode(query)
            fragment = parsed.fragment

        return urlunparse(parsed[:4] + (query, fragment))
开发者ID:GehirnInc,项目名称:py3oauth2,代码行数:25,代码来源:message.py


示例11: url_concat

def url_concat(url, args):
    """Concatenate url and arguments regardless of whether
    url has existing query parameters.

    ``args`` may be either a dictionary or a list of key-value pairs
    (the latter allows for multiple values with the same key.

    >>> url_concat("http://example.com/foo", dict(c="d"))
    'http://example.com/foo?c=d'
    >>> url_concat("http://example.com/foo?a=b", dict(c="d"))
    'http://example.com/foo?a=b&c=d'
    >>> url_concat("http://example.com/foo?a=b", [("c", "d"), ("c", "d2")])
    'http://example.com/foo?a=b&c=d&c=d2'
    """
    parsed_url = urlparse(url)
    if isinstance(args, dict):
        parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
        parsed_query.extend(args.items())
    elif isinstance(args, list) or isinstance(args, tuple):
        parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
        parsed_query.extend(args)
    else:
        err = "'args' parameter should be dict, list or tuple. Not {0}".format(
            type(args))
        raise TypeError(err)
    final_query = urlencode(parsed_query)
    url = urlunparse((
        parsed_url[0],
        parsed_url[1],
        parsed_url[2],
        parsed_url[3],
        final_query,
        parsed_url[5]))
    return url
开发者ID:mr-ping,项目名称:tornado,代码行数:34,代码来源:httputil.py


示例12: legacy_arguments

 def legacy_arguments(self):
     arguments = LegacyMultiDict()
     if hasattr(self, 'uri'):
         arguments.update(parse_qsl(self.parsed_uri.query))
     body = getattr(self, 'body', None)
     if body and getattr(self, 'content_type', None) == _FORM_CTYPE:
         arguments.update(parse_qsl(self.body.decode('ascii')))
     return arguments
开发者ID:dragon788,项目名称:zorro,代码行数:8,代码来源:web.py


示例13: form_arguments

 def form_arguments(self):
     arguments = {}
     if hasattr(self, 'uri'):
         arguments.update(parse_qsl(self.parsed_uri.query))
     body = getattr(self, 'body', None)
     if body and getattr(self, 'content_type', None) == FORM_CTYPE:
         arguments.update(parse_qsl(self.body.decode('ascii')))
     return arguments
开发者ID:dragon788,项目名称:zorro,代码行数:8,代码来源:zerogw.py


示例14: assertURLEqual

    def assertURLEqual(self, first, second, msg=None):
        """Check that two arguments are equivalent URLs. Ignores the order of
        query arguments.
        """
        first_parsed = urlparse(first)
        second_parsed = urlparse(second)
        self.assertEqual(first_parsed[:3], second_parsed[:3], msg)

        first_qsl = sorted(parse_qsl(first_parsed.query))
        second_qsl = sorted(parse_qsl(second_parsed.query))
        self.assertEqual(first_qsl, second_qsl, msg)
开发者ID:raphacosta27,项目名称:Insper,代码行数:11,代码来源:untitled5.py


示例15: _encode_url

 def _encode_url(self, data):
     query = self.query
     if data:
         data = native_str(data)
         if isinstance(data, str):
             data = parse_qsl(data)
         else:
             data = mapping_iterator(data)
         query = parse_qsl(query)
         query.extend(data)
         query = urlencode(query)
     self.query = query
开发者ID:successtest9,项目名称:pulsar,代码行数:12,代码来源:__init__.py


示例16: test_full_flow

    def test_full_flow(self, satosa_config_dict, oidc_frontend_config, saml_backend_config, idp_conf):
        user_id = "testuser1"

        # proxy config
        satosa_config_dict["FRONTEND_MODULES"] = [oidc_frontend_config]
        satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
        satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
                                                                               "saml": [attr_name]}
                                                                   for attr_name in USERS[user_id]}
        _, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))

        # application
        test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)

        # get frontend OP config info
        provider_config = json.loads(test_client.get("/.well-known/openid-configuration").data.decode("utf-8"))

        # create auth req
        claims_request = ClaimsRequest(id_token=Claims(**{k: None for k in USERS[user_id]}))
        req_args = {"scope": "openid", "response_type": "id_token", "client_id": CLIENT_ID,
                    "redirect_uri": REDIRECT_URI, "nonce": "nonce",
                    "claims": claims_request.to_json()}
        auth_req = urlparse(provider_config["authorization_endpoint"]).path + "?" + urlencode(req_args)

        # make auth req to proxy
        proxied_auth_req = test_client.get(auth_req)
        assert proxied_auth_req.status == "303 See Other"

        # config test IdP
        backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
        idp_conf["metadata"]["inline"].append(backend_metadata_str)
        fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf, metadata_construction=False))

        # create auth resp
        req_params = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
        url, authn_resp = fakeidp.handle_auth_req(
            req_params["SAMLRequest"],
            req_params["RelayState"],
            BINDING_HTTP_REDIRECT,
            user_id,
            response_binding=BINDING_HTTP_REDIRECT)

        # make auth resp to proxy
        authn_resp_req = urlparse(url).path + "?" + urlencode(authn_resp)
        authn_resp = test_client.get("/" + authn_resp_req)
        assert authn_resp.status == "303 See Other"

        # verify auth resp from proxy
        resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).fragment))
        signing_key = RSAKey(key=rsa_load(oidc_frontend_config["config"]["signing_key_path"]),
                             use="sig", alg="RS256")
        id_token_claims = JWS().verify_compact(resp_dict["id_token"], keys=[signing_key])
        assert all((k, v[0]) in id_token_claims.items() for k, v in USERS[user_id].items())
开发者ID:its-dirg,项目名称:SATOSA,代码行数:53,代码来源:test_oidc-saml.py


示例17: on_get

    def on_get(self, req, res):

        """Create Twitter JWT token
        """
        request_token_url = 'https://api.twitter.com/oauth/request_token'
        access_token_url = 'https://api.twitter.com/oauth/access_token'
        authenticate_url = 'https://api.twitter.com/oauth/authenticate'

        if req.get_param('oauth_token') and req.get_param('oauth_verifier'):
            auth = OAuth1(settings.TWITTER_KEY,
                          client_secret=settings.TWITTER_SECRET,
                          resource_owner_key=req.get_param('oauth_token'),
                          verifier=req.get_param('oauth_verifier'))
            logger.debug("Twitter OAuth: Got auth session.")
            r = requests.post(access_token_url, auth=auth)
            profile = dict(parse_qsl(r.text))
            logger.debug("Twitter OAuth: User profile retrieved")

            try:
                user = User.select().where(User.twitter == profile['user_id'] |
                                           User.username == profile['screen_name']).get()
            except:
                user = User.create(twitter=profile['user_id'],
                                   username=profile['screen_name'])
                user.save()

            token = utils.create_jwt_token(user)
            res.body = json.dumps({"token": token})
            res.status = falcon.HTTP_200
        else:
            oauth = OAuth1(settings.TWITTER_KEY,
                           client_secret=settings.TWITTER_SECRET,
                           callback_uri=settings.TWITTER_CALLBACK_URI)
            logger.debug("Twitter OAuth: Got auth session.")
            r = requests.post(request_token_url, auth=oauth)
            oauth_token = dict(parse_qsl(r.text))
            logger.debug("Twitter OAuth: User profile retrieved")
            qs = urlencode(dict(oauth_token=oauth_token['oauth_token']))

            # Falcon doesn't support redirects, so we have to fake it
            # this implementation has been taken from werkzeug
            final_url = authenticate_url + '?' + qs
            res.body = (
                '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n'
                '<title>Redirecting...</title>\n'
                '<h1>Redirecting...</h1>\n'
                '<p>You should be redirected automatically to target URL: '
                '<a href="{0}">{0}</a>.  If not click the link.'.format(final_url)
            )
            res.location = final_url
            res.status = falcon.HTTP_301
开发者ID:sikrvault,项目名称:sikr,代码行数:51,代码来源:twitter.py


示例18: on_user_web_access

    def on_user_web_access(self, user_id, get_array, post_array):
        TWITTER_KEY = "1"
        db = a.get_db()

        if 'connect' in get_array and get_array['connect'] == 'twitter':
            consumer = oauth.Consumer(consumer_key, consumer_secret)
            client = oauth.Client(consumer)

            resp, content = client.request(request_token_url, "GET")
            if resp['status'] != '200':
                raise Exception("Invalid response %s." % resp['status'])

            request_token = dict(parse.parse_qsl(content.decode()))

            db.sql("INSERT INTO request_tokens(user_id, token, token_secret) VALUES(%s, '"+request_token['oauth_token']+"', '"+request_token['oauth_token_secret']+"')", (str(user_id),))
            db.commit()


            a.p('<p>Connexion a Twitter requise.</p>')
            a.p('<a href="'+authorize_url+'?oauth_token='+(request_token['oauth_token'])+'">Continuer sur Twitter</a>')

        elif 'twitter' in get_array and get_array["twitter"] == TWITTER_KEY:
            consumer = oauth.Consumer(consumer_key, consumer_secret)
            client = oauth.Client(consumer)

            oauth_token = get_array['oauth_token']
            oauth_verifier = get_array['oauth_verifier']

            request_token = db.sql('SELECT * FROM request_tokens WHERE user_id=%s', (str(user_id),))[0]

            token = oauth.Token(request_token[1],
                request_token[2])
            token.set_verifier(oauth_verifier)
            client = oauth.Client(consumer, token)

            resp, content = client.request(access_token_url, "POST")

            if resp['status'] != '200':
                raise Exception("Invalid response %s." % resp['status'])

            access_token = dict(parse.parse_qsl(content.decode()))

            db.sql("INSERT INTO usr(usr_id, usr_token, usr_token_secret) VALUES(%s, '"+access_token['oauth_token']+"', '"+access_token['oauth_token_secret']+"')", (str(user_id),))
            db.sql('DELETE FROM request_tokens WHERE user_id = %s', (str(user_id),))
            db.commit()

            a.p('<p>Twitter pairing successful</p>')
        else:

            a.p('<a href="'+a.get_url()+'&connect=twitter">Connect with Twitter</a>')
开发者ID:Axce,项目名称:bobcat,代码行数:50,代码来源:main.py


示例19: run_test

    def run_test(self, satosa_config_dict, sp_conf, oidc_backend_config, frontend_config):
        subject_id = "testuser1"
        # proxy config
        satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
        satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config]
        satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
                                                                               "saml": [attr_name]}
                                                                   for attr_name in USERS[subject_id]}
        frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))

        # application
        test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)

        # config test SP
        frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
        sp_conf["metadata"]["inline"].append(frontend_metadata_str)
        fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))

        # create auth req
        destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
        auth_req = urlparse(destination).path + "?" + urlencode(req_args)

        # make auth req to proxy
        proxied_auth_req = test_client.get(auth_req)
        assert proxied_auth_req.status == "302 Found"
        parsed_auth_req = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))

        # create auth resp
        id_token_claims = {k: v[0] for k, v in USERS[subject_id].items()}
        id_token_claims["sub"] = subject_id
        id_token_claims["iat"] = time.time()
        id_token_claims["exp"] = time.time() + 3600
        id_token_claims["iss"] = "https://op.example.com"
        id_token_claims["aud"] = oidc_backend_config["config"]["client"]["client_metadata"]["client_id"]
        id_token_claims["nonce"] = parsed_auth_req["nonce"]
        id_token = IdToken(**id_token_claims).to_jwt()
        authn_resp = {"state": parsed_auth_req["state"], "id_token": id_token}

        # make auth resp to proxy
        redirect_uri_path = urlparse(
            oidc_backend_config["config"]["client"]["client_metadata"]["redirect_uris"][0]).path
        authn_resp_req = redirect_uri_path + "?" + urlencode(authn_resp)
        authn_resp = test_client.get(authn_resp_req)
        assert authn_resp.status == "303 See Other"

        # verify auth resp from proxy
        resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).query))
        auth_resp = fakesp.parse_authn_request_response(resp_dict["SAMLResponse"], BINDING_HTTP_REDIRECT)
        assert auth_resp.ava == USERS[subject_id]
开发者ID:SUNET,项目名称:SATOSA,代码行数:49,代码来源:test_saml-oidc.py


示例20: run_test

    def run_test(self, satosa_config_dict, sp_conf, idp_conf, saml_backend_config, frontend_config):
        user_id = "testuser1"
        # proxy config
        satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
        satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
        satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"saml": [attr_name]} for attr_name in
                                                                   USERS[user_id]}
        frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))

        # application
        test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)

        # config test SP
        frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
        sp_conf["metadata"]["inline"].append(frontend_metadata_str)
        fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))

        # create auth req
        destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
        auth_req = urlparse(destination).path + "?" + urlencode(req_args)

        # make auth req to proxy
        proxied_auth_req = test_client.get(auth_req)
        assert proxied_auth_req.status == "303 See Other"

        # config test IdP
        backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
        idp_conf["metadata"]["inline"].append(backend_metadata_str)
        fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf, metadata_construction=False))

        # create auth resp
        req_params = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
        url, authn_resp = fakeidp.handle_auth_req(
            req_params["SAMLRequest"],
            req_params["RelayState"],
            BINDING_HTTP_REDIRECT,
            user_id,
            response_binding=BINDING_HTTP_REDIRECT)

        # make auth resp to proxy
        authn_resp_req = urlparse(url).path + "?" + urlencode(authn_resp)
        authn_resp = test_client.get("/" + authn_resp_req)
        assert authn_resp.status == "303 See Other"

        # verify auth resp from proxy
        resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).query))
        auth_resp = fakesp.parse_authn_request_response(resp_dict["SAMLResponse"], BINDING_HTTP_REDIRECT)
        assert auth_resp.ava == USERS[user_id]
开发者ID:its-dirg,项目名称:SATOSA,代码行数:48,代码来源:test_saml-saml.py



注:本文中的urllib.parse.parse_qsl函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python urllib2.install_opener函数代码示例发布时间:2022-05-27
下一篇:
Python urllib.error函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap