• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python escape.parse_qs_bytes函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.escape.parse_qs_bytes函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qs_bytes函数的具体用法?Python parse_qs_bytes怎么用?Python parse_qs_bytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_qs_bytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

    def __init__(self, environ):
        """Parses the given WSGI environ to construct the request."""
        self.method = environ["REQUEST_METHOD"]
        self.path = urllib.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
        self.path += urllib.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
        self.uri = self.path
        self.arguments = {}
        self.query = environ.get("QUERY_STRING", "")
        if self.query:
            self.uri += "?" + self.query
            arguments = parse_qs_bytes(native_str(self.query))
            for name, values in arguments.iteritems():
                values = [v for v in values if v]
                if values:
                    self.arguments[name] = values
        self.version = "HTTP/1.1"
        self.headers = httputil.HTTPHeaders()
        if environ.get("CONTENT_TYPE"):
            self.headers["Content-Type"] = environ["CONTENT_TYPE"]
        if environ.get("CONTENT_LENGTH"):
            self.headers["Content-Length"] = environ["CONTENT_LENGTH"]
        for key in environ:
            if key.startswith("HTTP_"):
                self.headers[key[5:].replace("_", "-")] = environ[key]
        if self.headers.get("Content-Length"):
            self.body = environ["wsgi.input"].read(
                int(self.headers["Content-Length"]))
        else:
            self.body = ""
        self.protocol = environ["wsgi.url_scheme"]
        self.remote_ip = environ.get("REMOTE_ADDR", "")
        if environ.get("HTTP_HOST"):
            self.host = environ["HTTP_HOST"]
        else:
            self.host = environ["SERVER_NAME"]

        # Parse request body
        self.files = {}
        content_type = self.headers.get("Content-Type", "")
        if content_type.startswith("application/x-www-form-urlencoded"):
            for name, values in parse_qs_bytes(native_str(self.body)).iteritems():
                self.arguments.setdefault(name, []).extend(values)
        elif content_type.startswith("multipart/form-data"):
            if 'boundary=' in content_type:
                boundary = content_type.split('boundary=', 1)[1]
                if boundary:
                    httputil.parse_multipart_form_data(
                        utf8(boundary), self.body, self.arguments, self.files)
            else:
                logging.warning("Invalid multipart/form-data")

        self._start_time = time.time()
        self._finish_time = None
开发者ID:jamescasbon,项目名称:tornado,代码行数:53,代码来源:wsgi.py


示例2: parse_request

def parse_request(data):
    try:
        req = ast.literal_eval(data)
        path, sep, query = req['uri'].partition('?')
        get_arguments = parse_qs_bytes(query, keep_blank_values=True)
        post_arguments = parse_qs_bytes(req['body'], keep_blank_values=True)
        host = req['headers']['Host']
        headers = req['headers']
        return headers, host, path, get_arguments, post_arguments
    except _BadRequestException as e:
        gen_log.info("Malformed HTTP request:%s", e)
        return
开发者ID:yy1455412617,项目名称:NoSpiderScanner,代码行数:12,代码来源:collect_reqs.py


示例3: parse_body_arguments

def parse_body_arguments(content_type, body, arguments, files, headers=None):
    """Parses a form request body.

    Supports ``application/x-www-form-urlencoded`` and
    ``multipart/form-data``.  The ``content_type`` parameter should be
    a string and ``body`` should be a byte string.  The ``arguments``
    and ``files`` parameters are dictionaries that will be updated
    with the parsed contents.
    """
    if headers and 'Content-Encoding' in headers:
        gen_log.warning("Unsupported Content-Encoding: %s",
                        headers['Content-Encoding'])
        return
    if content_type.startswith("application/x-www-form-urlencoded"):
        try:
            uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True)
        except Exception as e:
            gen_log.warning('Invalid x-www-form-urlencoded body: %s', e)
            uri_arguments = {}
        for name, values in uri_arguments.items():
            if values:
                arguments.setdefault(name, []).extend(values)
    elif content_type.startswith("multipart/form-data"):
        fields = content_type.split(";")
        for field in fields:
            k, sep, v = field.strip().partition("=")
            if k == "boundary" and v:
                parse_multipart_form_data(utf8(v), body, arguments, files)
                break
        else:
            gen_log.warning("Invalid multipart/form-data")
开发者ID:1487quantum,项目名称:fyp-autonomous-bot,代码行数:31,代码来源:httputil.py


示例4: parse_body_arguments

def parse_body_arguments(content_type, body, arguments, files, headers=None):
    if headers and 'Content-Encoding' in headers:
        print("Unsupported Content-Encoding: %s" % headers['Content-Encoding'])
        return
    if content_type.startswith("application/x-www-form-urlencoded"):
        try:
            uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True)
        except Exception as e:
            print('Invalid x-www-form-urlencoded body: %s' % e)
            uri_arguments = {}
        for name, values in uri_arguments.items():
            if values:
                arguments.setdefault(name, []).extend(values)
    elif content_type.startswith("multipart/form-data"):
        try:
            fields = content_type.split(";")
            for field in fields:
                k, sep, v = field.strip().partition("=")
                if k == "boundary" and v:
                    parse_multipart_form_data(utf8(v), body, arguments, files)
                    break
            else:
                raise ValueError("multipart boundary not found")
        except Exception as e:
            print("Invalid multipart/form-data: %s" % e)
开发者ID:confucianzuoyuan,项目名称:tinytornado,代码行数:25,代码来源:httputil.py


示例5: _on_request_body

 def _on_request_body(self, data):
     try:
         self._request.body = data
         content_type = self._request.headers.get("Content-Type", "")
         if self._request.method in ("POST", "PUT"):
             if content_type.startswith("application/x-www-form-urlencoded"):
                 arguments = parse_qs_bytes(native_str(self._request.body), keep_blank_values = True)
                 for name, values in arguments.iteritems():
                     values = [v for v in values if v is not None]
                     if values:
                         self._request.arguments.setdefault(name, []).extend(
                             values)
             elif content_type.startswith("multipart/form-data"):
                 fields = content_type.split(";")
                 for field in fields:
                     k, sep, v = field.strip().partition("=")
                     if k == "boundary" and v:
                         httputil.parse_multipart_form_data(
                             utf8(v), data,
                             self._request.arguments,
                             self._request.files)
                         break
                 else:
                     logging.warning("Invalid multipart/form-data")
         self.request_callback(self._request)
     except BadRequestException, e:
         logging.info("Malformed HTTP request from %s: %s",
                      self.address[0], e)
         logging.info('Request:\n%s', data)
         self.no_keep_alive = True
         if not self.stream.closed():
             self.stream.write("HTTP/1.1 400 Bad request\r\n\r\n", self._finish_request)
         return
开发者ID:hhru,项目名称:hh-tornado-old,代码行数:33,代码来源:httpserver.py


示例6: _on_request_body

 def _on_request_body(self, data):
     self.reset_connection_timeout()
     self._request.body = data
     content_type = self._request.headers.get("Content-Type", "")
     if self._request.method in ("POST", "PUT"):
         if content_type.startswith("application/x-www-form-urlencoded"):
             arguments = parse_qs_bytes(native_str(self._request.body))
             for name, values in arguments.iteritems():
                 values = [v for v in values if v]
                 if values:
                     self._request.arguments.setdefault(name, []).extend(
                         values)
         elif content_type.startswith("multipart/form-data"):
             fields = content_type.split(";")
             for field in fields:
                 k, sep, v = field.strip().partition("=")
                 if k == "boundary" and v:
                     httputil.parse_multipart_form_data(
                         utf8(v), data,
                         self._request.arguments,
                         self._request.files)
                     break
             else:
                 logging.warning("Invalid multipart/form-data")
     self.request_callback(self._request)
开发者ID:flodiebold,项目名称:tornado,代码行数:25,代码来源:httpserver.py


示例7: __init__

    def __init__(self, method, uri, version="HTTP/1.0", headers=None,
                 body=None, remote_ip=None, protocol=None, host=None,
                 files=None, connection=None):
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers or httputil.HTTPHeaders()
        self.body = body or ""
        self.trace = None

        # set remote IP and protocol
        self.remote_ip = remote_ip
        if protocol:
            self.protocol = protocol
        elif connection and isinstance(connection.stream,
                                       iostream.SSLIOStream):
            self.protocol = "https"
        else:
            self.protocol = "http"

        # xheaders can override the defaults
        if connection and connection.xheaders:
            # Squid uses X-Forwarded-For, others use X-Real-Ip
            ip = self.headers.get("X-Forwarded-For", self.remote_ip)
            ip = ip.split(',')[-1].strip()
            ip = self.headers.get(
                "X-Real-Ip", ip)
            if netutil.is_valid_ip(ip):
                self.remote_ip = ip
            # AWS uses X-Forwarded-Proto
            proto = self.headers.get(
                "X-Scheme", self.headers.get("X-Forwarded-Proto", self.protocol))
            if proto in ("http", "https"):
                self.protocol = proto
            # Zipkin users 
            if options.server_trace:
                parent_span_id = self.headers.get("X-B3-ParentSpanId", None)
                trace_id = self.headers.get("X-B3-TraceId", None)
                span_id = self.headers.get("X-B3-SpanId", None)
                name =  method
                endpoint = Endpoint(ipv4=socket.gethostbyname(socket.gethostname()), port=port, service_name=service_name)
                
                self.trace = Trace(name=name, trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id)
                self.trace.set_endpoint(endpoint)

        self.host = host or self.headers.get("Host") or "127.0.0.1"
        self.files = files or {}
        self.connection = connection
        self._start_time = time.time()
        self._finish_time = None

        self.path, sep, self.query = uri.partition('?')
        self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
        self.query_arguments = copy.deepcopy(self.arguments)
        self.body_arguments = {}
        
        if options.server_trace:
            self.trace.record(Annotation.string('Url', uri))
            self.trace.record(Annotation.string('Header', self.headers))
            self.trace.record(Annotation.server_recv())
开发者ID:iambocai,项目名称:tornado,代码行数:60,代码来源:httpserver.py


示例8: decorated

    def decorated(self, *args, **kwargs):

        original_method(self, *args, **kwargs)

        arguments = parse_qs_bytes(self.query, keep_blank_values=True)
        for name, values in arguments.iteritems():
            self.arguments[name] = values
开发者ID:raymondbutcher,项目名称:breeze,代码行数:7,代码来源:hacks.py


示例9: __init__

    def __init__(self, method=None, uri=None, version="HTTP/1.0", headers=None,
                 body=None, host=None, files=None, connection=None,
                 start_line=None):
        if start_line is not None:
            method, uri, version = start_line
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers or HTTPHeaders()
        self.body = body or ""

        # set remote IP and protocol
        context = getattr(connection, 'context', None)
        self.remote_ip = getattr(context, 'remote_ip')
        self.protocol = getattr(context, 'protocol', "http")

        self.host = host or self.headers.get("Host") or "127.0.0.1"
        self.files = files or {}
        self.connection = connection
        self._start_time = time.time()
        self._finish_time = None

        self.path, sep, self.query = uri.partition('?')
        self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
        self.query_arguments = copy.deepcopy(self.arguments)
        self.body_arguments = {}
开发者ID:1487quantum,项目名称:fyp-autonomous-bot,代码行数:26,代码来源:httputil.py


示例10: create_request

    def create_request(self, uri, method="GET", headers={}, body=None, remote_ip=None):
        request = HTTPRequest(uri=uri, method=method, headers=headers, body=body, remote_ip=remote_ip)

        if body:
            arguments = parse_qs_bytes(native_str(body))
            for name, values in arguments.iteritems():
                values = [v for v in values if v]
                if values:
                    request.arguments.setdefault(name, []).extend(values)

        return request
开发者ID:kplaube,项目名称:torneira,代码行数:11,代码来源:client.py


示例11: _on_access_token

    def _on_access_token(self, redirect_uri, client_id, client_secret,
                         callback, fields, response):
        args = escape.parse_qs_bytes(escape.native_str(response.body))

        #self.finish({"body":response.body, "args":args})
        session = {
            "access_token": args["access_token"][-1],
            "refresh_token": args["refresh_token"][-1],
            "expires_in": args["expires_in"][-1]
        }

        callback(session)
开发者ID:kernel1983,项目名称:tornado_QQMixin,代码行数:12,代码来源:auth.py


示例12: get

 def get(self):
     code = self.get_argument('code', False)
     if code:
         token_url = self._oauth_request_token_url(
             redirect_uri=self.settings['redirect_url'],
             client_id=self.settings['facebook_api_key'],
             client_secret=self.settings['facebook_secret'],
             code=code,
         )
         req = urllib2.Request(token_url)
         response = urllib2.urlopen(req)
         args = escape.parse_qs_bytes(escape.native_str(response.read()))
         access_token = args['access_token'][-1]
开发者ID:iandyh,项目名称:facebook-app-base,代码行数:13,代码来源:facebook.py


示例13: __init__

    def __init__(
        self,
        method,
        uri,
        version="HTTP/1.0",
        headers=None,
        body=None,
        remote_ip=None,
        protocol=None,
        host=None,
        files=None,
        connection=None,
    ):
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers or httputil.HTTPHeaders()
        self.body = body or ""

        # set remote IP and protocol
        self.remote_ip = remote_ip
        if protocol:
            self.protocol = protocol
        elif connection and isinstance(connection.stream, iostream.SSLIOStream):
            self.protocol = "https"
        else:
            self.protocol = "http"

        # xheaders can override the defaults
        if connection and connection.xheaders:
            # Squid uses X-Forwarded-For, others use X-Real-Ip
            ip = self.headers.get("X-Forwarded-For", self.remote_ip)
            ip = ip.split(",")[-1].strip()
            ip = self.headers.get("X-Real-Ip", ip)
            if netutil.is_valid_ip(ip):
                self.remote_ip = ip
            # AWS uses X-Forwarded-Proto
            proto = self.headers.get("X-Scheme", self.headers.get("X-Forwarded-Proto", self.protocol))
            if proto in ("http", "https"):
                self.protocol = proto

        self.host = host or self.headers.get("Host") or "127.0.0.1"
        self.files = files or {}
        self.connection = connection
        self._start_time = time.time()
        self._finish_time = None

        self.path, sep, self.query = uri.partition("?")
        self.arguments = parse_qs_bytes(self.query, keep_blank_values=True)
        self.query_arguments = copy.deepcopy(self.arguments)
        self.body_arguments = {}
开发者ID:naeemattari7,项目名称:tornado,代码行数:51,代码来源:httpserver.py


示例14: __init__

    def __init__(
        self,
        method,
        uri,
        version="HTTP/1.0",
        headers=None,
        body=None,
        remote_ip=None,
        protocol=None,
        host=None,
        files=None,
        connection=None,
    ):
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers or httputil.HTTPHeaders()
        self.body = body or ""
        if connection and connection.xheaders:
            # Squid uses X-Forwarded-For, others use X-Real-Ip
            self.remote_ip = self.headers.get("X-Real-Ip", self.headers.get("X-Forwarded-For", remote_ip))
            if not self._valid_ip(self.remote_ip):
                self.remote_ip = remote_ip
            # AWS uses X-Forwarded-Proto
            self.protocol = self.headers.get("X-Scheme", self.headers.get("X-Forwarded-Proto", protocol))
            if self.protocol not in ("http", "https"):
                self.protocol = "http"
        else:
            self.remote_ip = remote_ip
            if protocol:
                self.protocol = protocol
            elif connection and isinstance(connection.stream, iostream.SSLIOStream):
                self.protocol = "https"
            else:
                self.protocol = "http"
        self.host = host or self.headers.get("Host") or "127.0.0.1"
        self.files = files or {}
        self.connection = connection
        self._start_time = time.time()
        self._finish_time = None

        scheme, netloc, path, query, fragment = urlparse.urlsplit(native_str(uri))
        self.path = path
        self.query = query
        arguments = parse_qs_bytes(query)
        self.arguments = {}
        for name, values in arguments.iteritems():
            values = [v for v in values if v]
            if values:
                self.arguments[name] = values
开发者ID:huangxinms,项目名称:tornado,代码行数:50,代码来源:httpserver.py


示例15: _on_access_token

    def _on_access_token(self, redirect_uri, client_id, client_secret, callback, fields, response):
        if response.error:
            gen_log.warning("Facebook auth error: %s" % str(response))
            callback(None)
            return

        args = escape.parse_qs_bytes(escape.native_str(response.body))
        session = {"access_token": args["access_token"][-1], "expires": args.get("expires")}
        self.facebook_request(
            path="/me",
            callback=self.async_callback(self._on_get_user_info, callback, session, fields),
            access_token=session["access_token"],
            fields=",".join(fields),
        )
开发者ID:guilhermebruzzi,项目名称:listas-publicas,代码行数:14,代码来源:auth.py


示例16: _on_access_token

 def _on_access_token(self, redirect_uri, client_id, client_secret,
                     callback, response):
     if response.error:
         logging.warn('QQ auth error: %s' % str(response))
         callback(None)
         return
     args = escape.parse_qs_bytes(escape.native_str(response.body))
     session = {
         "access_token": args["access_token"][-1],
         "expires": args.get("expires_in")[0]
     }
     http = self.get_auth_http_client()
     response = yield gen.Task(http.fetch, url_concat(self._OAUTH_OPEND_ID_URL, {"access_token":session["access_token"]}))
     self._on_open_id(redirect_uri, client_id,
                      client_secret, callback, session, response)
开发者ID:afeide,项目名称:LuoYunCloud,代码行数:15,代码来源:auth.py


示例17: _on_access_token

    def _on_access_token(self, callback, response):
        if response.error:
            logging.warning('Github auth error: %s' % str(response))
            callback(None)
            return

        args = escape.parse_qs_bytes(escape.native_str(response.body))
        logging.info('Got access token: %s', args)
        access_token = args['access_token'][0]
        token_type = args['token_type'][0]

        self.github_request(
            '/user',
            access_token=access_token,
            callback=self.async_callback(
                self._on_get_user_info, callback, access_token, token_type))
开发者ID:reorx,项目名称:stargazer,代码行数:16,代码来源:handlers.py


示例18: _on_access_token

	def _on_access_token(self, redirect_uri, client_id, client_secret,
						 future, fields, response):
		if response.error:
			future.set_exception(AuthError('QQ auth error: %s' % str(response)))
			return

		args = escape.parse_qs_bytes(escape.native_str(response.body))
		session = {
			"access_token": args["access_token"][0],
			"expires": args.get("expires_in")[0]
		}
		
		http = self.get_auth_http_client()
		http.fetch("https://graph.qq.com/oauth2.0/me?access_token="+session["access_token"], 
				   self.async_callback(self._on_access_openid, redirect_uri, client_id,
									   client_secret, session, future, fields))
开发者ID:lmdu,项目名称:sci,代码行数:16,代码来源:utils.py


示例19: _on_access_token

    def _on_access_token(self, redirect_uri, client_id, client_secret,
                         callback, fields, response):
        if response.error:
            logging.warning('Github auth error {0}'.format(str(response)))
            callback(None)
            return
        args = escape.parse_qs_bytes(escape.native_str(response.body))

        session = {}
        session['access_token'] = args.get('access_token')[0]

        self.github_request(
            path='/user',
            callback=self.async_callback(
                self._get_user_info, callback, session, fields),
            access_token=session['access_token'])
开发者ID:coderclash,项目名称:Coder-Clash,代码行数:16,代码来源:github.py


示例20: parse_body_arguments

def parse_body_arguments(content_type, body, arguments, files):
    if content_type.startswith("application/x-www-form-urlencoded"):
        uri_arguments = parse_qs_bytes(native_str(body))
        for name, values in uri_arguments.iteritems():
            values = [v for v in values if v]
            if values:
                arguments.setdefault(name, []).extend(values)
    elif content_type.startswith("multipart/form-data"):
        fields = content_type.split(";")
        for field in fields:
            k, sep, v = field.strip().partition("=")
            if k == "boundary" and v:
                parse_multipart_form_data(utf8(v), body, arguments, files)
                break
        else:
            logging.warning("Invalid multipart/form-data")
开发者ID:Chaosbit,项目名称:CouchPotatoServer,代码行数:16,代码来源:httputil.py



注:本文中的tornado.escape.parse_qs_bytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python escape.recursive_unicode函数代码示例发布时间:2022-05-27
下一篇:
Python escape.native_str函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap