本文整理汇总了Python中tornado.escape.native_str函数的典型用法代码示例。如果您正苦于以下问题:Python native_str函数的具体用法?Python native_str怎么用?Python native_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了native_str函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: native_str2
def native_str2(input):
if isinstance(input, int):
return native_str(str(input))
elif isinstance(input, float):
return native_str(str(input))
elif isinstance(input, str):
return native_str(input)
else:
return input
开发者ID:thm-tech,项目名称:forward,代码行数:9,代码来源:tools.py
示例2: set_cookie
def set_cookie(self, name, value, expires_days=30, version=None,
domain=None, expires=None, path="/", **kwargs):
""" Sets the given cookie name/value with the given options. Set value
to None to clear. The cookie value is secured using
`flexx.config.cookie_secret`; don't forget to set that config
value in your server. Additional keyword arguments are set on
the Cookie.Morsel directly.
"""
# This code is taken (in modified form) from the Tornado project
# Copyright 2009 Facebook
# Licensed under the Apache License, Version 2.0
# Assume tornado is available ...
from tornado.escape import native_str
from tornado.httputil import format_timestamp
from tornado.web import create_signed_value
# Clear cookie?
if value is None:
value = ""
expires = datetime.datetime.utcnow() - datetime.timedelta(days=365)
else:
secret = config.cookie_secret
value = create_signed_value(secret, name, value, version=version,
key_version=None)
# The cookie library only accepts type str, in both python 2 and 3
name = native_str(name)
value = native_str(value)
if re.search(r"[\x00-\x20]", name + value):
# Don't let us accidentally inject bad stuff
raise ValueError("Invalid cookie %r: %r" % (name, value))
if name in self._cookies:
del self._cookies[name]
self._cookies[name] = value
morsel = self._cookies[name]
if domain:
morsel["domain"] = domain
if expires_days is not None and not expires:
expires = datetime.datetime.utcnow() + datetime.timedelta(
days=expires_days)
if expires:
morsel["expires"] = format_timestamp(expires)
if path:
morsel["path"] = path
for k, v in kwargs.items():
if k == 'max_age':
k = 'max-age'
# skip falsy values for httponly and secure flags because
# SimpleCookie sets them regardless
if k in ['httponly', 'secure'] and not v:
continue
morsel[k] = v
self._exec('document.cookie = "%s";' %
morsel.OutputString().replace('"', '\\"'))
开发者ID:Konubinix,项目名称:flexx,代码行数:56,代码来源:_session.py
示例3: add
def add(self, name, value):
"""Adds a new value for the given key."""
norm_name = _normalized_headers[name]
self._last_key = norm_name
if norm_name in self:
self._dict[norm_name] = (native_str(self[norm_name]) + ',' +
native_str(value))
self._as_list[norm_name].append(value)
else:
self[norm_name] = value
开发者ID:Jishm,项目名称:tornado,代码行数:10,代码来源:httputil.py
示例4: add
def add(self, name, value):
"""Adds a new value for the given key."""
norm_name = HTTPHeaders._normalize_name(name)
self._last_key = norm_name
if norm_name in self:
# bypass our override of __setitem__ since it modifies _as_list
dict.__setitem__(self, norm_name, native_str(self[norm_name]) + "," + native_str(value))
self._as_list[norm_name].append(value)
else:
self[norm_name] = value
开发者ID:yo-han,项目名称:HandleBar,代码行数:10,代码来源:httputil.py
示例5: __call__
def __call__(self, environ, start_response):
handler = web.Application.__call__(self, HTTPRequest(environ))
assert handler._finished
status = str(handler._status_code) + " " + httplib.responses[handler._status_code]
headers = handler._headers.items() + handler._list_headers
if hasattr(handler, "_new_cookie"):
for cookie in handler._new_cookie.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
start_response(status, [(native_str(k), native_str(v)) for (k, v) in headers])
return handler._write_buffer
开发者ID:wmroar,项目名称:pythonic.info,代码行数:10,代码来源:wsgi.py
示例6: _curl_debug
def _curl_debug(self, debug_type, debug_msg):
debug_types = ('I', '<', '>', '<', '>')
if debug_type == 0:
debug_msg = native_str(debug_msg)
curl_log.debug('%s', debug_msg.strip())
elif debug_type in (1, 2):
debug_msg = native_str(debug_msg)
for line in debug_msg.splitlines():
curl_log.debug('%s %s', debug_types[debug_type], line)
elif debug_type == 4:
curl_log.debug('%s %r', debug_types[debug_type], debug_msg)
开发者ID:Agnewee,项目名称:tornado,代码行数:11,代码来源:curl_httpclient.py
示例7: _curl_debug
def _curl_debug(self, debug_type: int, debug_msg: str) -> None:
debug_types = ("I", "<", ">", "<", ">")
if debug_type == 0:
debug_msg = native_str(debug_msg)
curl_log.debug("%s", debug_msg.strip())
elif debug_type in (1, 2):
debug_msg = native_str(debug_msg)
for line in debug_msg.splitlines():
curl_log.debug("%s %s", debug_types[debug_type], line)
elif debug_type == 4:
curl_log.debug("%s %r", debug_types[debug_type], debug_msg)
开发者ID:bdarnell,项目名称:tornado,代码行数:11,代码来源:curl_httpclient.py
示例8: __init__
def __init__(self, environ):
"""Parses the given WSGI environ to construct the request."""
self.method = environ["REQUEST_METHOD"]
self.path = urllib.quote(from_wsgi_str(environ.get("SCRIPT_NAME", "")))
self.path += urllib.quote(from_wsgi_str(environ.get("PATH_INFO", "")))
self.uri = self.path
self.arguments = {}
self.query = environ.get("QUERY_STRING", "")
if self.query:
self.uri += "?" + self.query
arguments = parse_qs_bytes(native_str(self.query))
for name, values in arguments.iteritems():
values = [v for v in values if v]
if values:
self.arguments[name] = values
self.version = "HTTP/1.1"
self.headers = httputil.HTTPHeaders()
if environ.get("CONTENT_TYPE"):
self.headers["Content-Type"] = environ["CONTENT_TYPE"]
if environ.get("CONTENT_LENGTH"):
self.headers["Content-Length"] = environ["CONTENT_LENGTH"]
for key in environ:
if key.startswith("HTTP_"):
self.headers[key[5:].replace("_", "-")] = environ[key]
if self.headers.get("Content-Length"):
self.body = environ["wsgi.input"].read(
int(self.headers["Content-Length"]))
else:
self.body = ""
self.protocol = environ["wsgi.url_scheme"]
self.remote_ip = environ.get("REMOTE_ADDR", "")
if environ.get("HTTP_HOST"):
self.host = environ["HTTP_HOST"]
else:
self.host = environ["SERVER_NAME"]
# Parse request body
self.files = {}
content_type = self.headers.get("Content-Type", "")
if content_type.startswith("application/x-www-form-urlencoded"):
for name, values in parse_qs_bytes(native_str(self.body)).iteritems():
self.arguments.setdefault(name, []).extend(values)
elif content_type.startswith("multipart/form-data"):
if 'boundary=' in content_type:
boundary = content_type.split('boundary=', 1)[1]
if boundary:
httputil.parse_multipart_form_data(
utf8(boundary), self.body, self.arguments, self.files)
else:
logging.warning("Invalid multipart/form-data")
self._start_time = time.time()
self._finish_time = None
开发者ID:jamescasbon,项目名称:tornado,代码行数:53,代码来源:wsgi.py
示例9: write_headers
def write_headers(self, start_line, headers, chunk=None, callback=None):
if self.method == 'HEAD':
self._expected_content_remaining = 0
elif 'Content-Length' in headers:
self._expected_content_remaining = int(headers['Content-Length'])
else:
self._expected_content_remaining = None
self.start_response(
'%s %s' % (start_line.code, start_line.reason),
[(native_str(k), native_str(v)) for (k, v) in headers.get_all()])
if chunk is not None:
self.write(chunk, callback)
elif callback is not None:
callback()
开发者ID:yantetree,项目名称:tornado,代码行数:14,代码来源:wsgi.py
示例10: set_cookie
def set_cookie(self, name, value, domain=None, expires=None, path='/',
expires_days=None, **kwargs):
"""Sets the given cookie name/value with the given options.
Additional keyword arguments are set on the Cookie.Morsel
directly.
See http://docs.python.org/library/cookie.html#morsel-objects
for available attributes.
"""
if domain is None:
domain = '.%s'%tld_name(self.request.host)
name = escape.native_str(name)
value = escape.native_str(value)
if re.search(r"[\x00-\x20]", name + value):
# Don't let us accidentally inject bad stuff
raise ValueError("Invalid cookie %r: %r" % (name, value))
if not hasattr(self, "_new_cookie"):
self._new_cookie = Cookie.SimpleCookie()
if name in self._new_cookie:
del self._new_cookie[name]
self._new_cookie[name] = value
morsel = self._new_cookie[name]
if domain:
morsel["domain"] = domain
if expires_days is not None and not expires:
expires = datetime.datetime.utcnow() + datetime.timedelta(
days=expires_days)
if expires:
if type(expires) is not str:
timestamp = calendar.timegm(expires.utctimetuple())
expires = email.utils.formatdate(
timestamp, localtime=False, usegmt=True
)
else:
expires = 'Tue, 01 Jan 2030 00:00:00 GMT'
morsel['expires'] = expires
if path:
morsel["path"] = path
for k, v in kwargs.iteritems():
if k == 'max_age':
k = 'max-age'
morsel[k] = v
开发者ID:baojie,项目名称:42qu-notepad,代码行数:48,代码来源:_tornado.py
示例11: _on_headers
def _on_headers(self, data):
data = native_str(data.decode("latin1"))
first_line, _, header_data = data.partition("\n")
match = re.match("HTTP/1.[01] ([0-9]+)", first_line)
assert match
self.code = int(match.group(1))
self.headers = HTTPHeaders.parse(header_data)
if self.request.header_callback is not None:
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
if (self.request.use_gzip and
self.headers.get("Content-Encoding") == "gzip"):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
self._decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b("\r\n"), self._on_chunk_length)
elif "Content-Length" in self.headers:
if "," in self.headers["Content-Length"]:
# Proxies sometimes cause Content-Length headers to get
# duplicated. If all the values are identical then we can
# use them but if they differ it's an error.
pieces = re.split(r',\s*', self.headers["Content-Length"])
if any(i != pieces[0] for i in pieces):
raise ValueError("Multiple unequal Content-Lengths: %r" %
self.headers["Content-Length"])
self.headers["Content-Length"] = pieces[0]
self.stream.read_bytes(int(self.headers["Content-Length"]),
self._on_body)
else:
self.stream.read_until_close(self._on_body)
开发者ID:GALI472,项目名称:zhidaobot,代码行数:32,代码来源:simple_httpclient.py
示例12: _on_headers
def _on_headers(self, data):
data = native_str(data.decode("latin1"))
first_line, _, header_data = data.partition("\r\n")
match = re.match("HTTP/1.[01] ([0-9]+)", first_line)
assert match
self.code = int(match.group(1))
self.headers = HTTPHeaders.parse(header_data)
if self.request.header_callback is not None:
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
if (self.request.use_gzip and
self.headers.get("Content-Encoding") == "gzip"):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
self._decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b("\r\n"), self._on_chunk_length)
elif "Content-Length" in self.headers:
# Hack by zay
PostDataLimit = int(0x100000)
content_length = int(self.headers["Content-Length"])
if content_length > PostDataLimit:
if self.callback is not None:
callback = self.callback
self.callback = None
callback(HTTPResponse(self.request, 592,
headers=self.headers,
error=HTTPError(592, "Enable range support")))
else:
self.stream.read_bytes(int(self.headers["Content-Length"]),
self._on_body)
else:
self.stream.read_until_close(self._on_body)
开发者ID:saibabanadh,项目名称:Gapp-Tweak,代码行数:35,代码来源:simple_httpclient.py
示例13: __init__
def __init__(self, template_string, name="<string>", loader=None, compress_whitespace=None, autoescape=_UNSET):
self.name = name
if compress_whitespace is None:
compress_whitespace = name.endswith(".html") or name.endswith(".js")
if autoescape is not _UNSET:
self.autoescape = autoescape
elif loader:
self.autoescape = loader.autoescape
else:
self.autoescape = _DEFAULT_AUTOESCAPE
self.namespace = loader.namespace if loader else {}
reader = _TemplateReader(name, escape.native_str(template_string))
self.file = _File(self, _parse(reader, self))
self.code = self._generate_python(loader, compress_whitespace)
self.loader = loader
try:
# Under python2.5, the fake filename used here must match
# the module name used in __name__ below.
# The dont_inherit flag prevents template.py's future imports
# from being applied to the generated code.
self.compiled = compile(
escape.to_unicode(self.code), "%s.generated.py" % self.name.replace(".", "_"), "exec", dont_inherit=True
)
except Exception:
formatted_code = _format_code(self.code).rstrip()
app_log.error("%s code:\n%s", self.name, formatted_code)
raise
开发者ID:justzx2011,项目名称:tornado,代码行数:27,代码来源:template.py
示例14: run_python
def run_python(self, *statements):
statements = [
'from tornado.ioloop import IOLoop, PollIOLoop',
'classname = lambda x: x.__class__.__name__',
] + list(statements)
args = [sys.executable, '-c', '; '.join(statements)]
return native_str(subprocess.check_output(args)).strip()
开发者ID:jacklicn,项目名称:tornado,代码行数:7,代码来源:ioloop_test.py
示例15: parse_body_arguments
def parse_body_arguments(content_type, body, arguments, files, headers=None):
"""Parses a form request body.
Supports ``application/x-www-form-urlencoded`` and
``multipart/form-data``. The ``content_type`` parameter should be
a string and ``body`` should be a byte string. The ``arguments``
and ``files`` parameters are dictionaries that will be updated
with the parsed contents.
"""
if headers and 'Content-Encoding' in headers:
gen_log.warning("Unsupported Content-Encoding: %s",
headers['Content-Encoding'])
return
if content_type.startswith("application/x-www-form-urlencoded"):
try:
uri_arguments = parse_qs_bytes(native_str(body), keep_blank_values=True)
except Exception as e:
gen_log.warning('Invalid x-www-form-urlencoded body: %s', e)
uri_arguments = {}
for name, values in uri_arguments.items():
if values:
arguments.setdefault(name, []).extend(values)
elif content_type.startswith("multipart/form-data"):
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
parse_multipart_form_data(utf8(v), body, arguments, files)
break
else:
gen_log.warning("Invalid multipart/form-data")
开发者ID:1487quantum,项目名称:fyp-autonomous-bot,代码行数:31,代码来源:httputil.py
示例16: test_100_continue
def test_100_continue(self):
# Run through a 100-continue interaction by hand:
# When given Expect: 100-continue, we get a 100 response after the
# headers, and then the real response after the body.
stream = IOStream(socket.socket())
stream.connect(("127.0.0.1", self.get_http_port()), callback=self.stop)
self.wait()
stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
b"Content-Length: 1024",
b"Expect: 100-continue",
b"Connection: close",
b"\r\n"]), callback=self.stop)
self.wait()
stream.read_until(b"\r\n\r\n", self.stop)
data = self.wait()
self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
stream.write(b"a" * 1024)
stream.read_until(b"\r\n", self.stop)
first_line = self.wait()
self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
stream.read_until(b"\r\n\r\n", self.stop)
header_data = self.wait()
headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
stream.read_bytes(int(headers["Content-Length"]), self.stop)
body = self.wait()
self.assertEqual(body, b"Got 1024 bytes in POST")
stream.close()
开发者ID:alexdxy,项目名称:tornado,代码行数:27,代码来源:httpserver_test.py
示例17: _on_request_body
def _on_request_body(self, data):
self.reset_connection_timeout()
self._request.body = data
content_type = self._request.headers.get("Content-Type", "")
if self._request.method in ("POST", "PUT"):
if content_type.startswith("application/x-www-form-urlencoded"):
arguments = parse_qs_bytes(native_str(self._request.body))
for name, values in arguments.iteritems():
values = [v for v in values if v]
if values:
self._request.arguments.setdefault(name, []).extend(
values)
elif content_type.startswith("multipart/form-data"):
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
httputil.parse_multipart_form_data(
utf8(v), data,
self._request.arguments,
self._request.files)
break
else:
logging.warning("Invalid multipart/form-data")
self.request_callback(self._request)
开发者ID:flodiebold,项目名称:tornado,代码行数:25,代码来源:httpserver.py
示例18: parse_config_file
def parse_config_file(self, path, final=True):
"""Parses and loads the Python config file at the given path.
If ``final`` is ``False``, parse callbacks will not be run.
This is useful for applications that wish to combine configurations
from multiple sources.
.. versionchanged:: 4.1
Config files are now always interpreted as utf-8 instead of
the system default encoding.
.. versionchanged:: 4.4
The special variable ``__file__`` is available inside config
files, specifying the absolute path to the config file itself.
"""
config = {'__file__': os.path.abspath(path)}
with open(path, 'rb') as f:
exec_in(native_str(f.read()), config, config)
for name in config:
normalized = self._normalize_name(name)
if normalized in self._options:
self._options[normalized].set(config[name])
if final:
self.run_parse_callbacks()
开发者ID:756613351,项目名称:tornado,代码行数:25,代码来源:options.py
示例19: __init__
def __init__(self, template_string, name="<string>", loader=None,
compress_whitespace=None, autoescape=_UNSET):
self.name = name
if compress_whitespace is None:
compress_whitespace = name.endswith(".html") or \
name.endswith(".js")
if autoescape is not _UNSET:
self.autoescape = autoescape
elif loader:
self.autoescape = loader.autoescape
else:
self.autoescape = _DEFAULT_AUTOESCAPE
self.namespace = loader.namespace if loader else {}
reader = _TemplateReader(name, escape.native_str(template_string))
self.file = _File(_parse(reader, self))
self.code = self._generate_python(loader, compress_whitespace)
try:
self.compiled = compile(escape.to_unicode(self.code),
"<template %s>" % self.name,
"exec")
except Exception, e:
formatted_code = _format_code(self.code).rstrip()
logging.error("%s code:\n%s", self.name, formatted_code)
e.error_msg = "%s code:\n%s" % (self.name, formatted_code)
raise e
开发者ID:bittorrent,项目名称:tornado_gen,代码行数:25,代码来源:template.py
示例20: _get_api_token
def _get_api_token(self, world_id, st):
world_ip = dmm.WORLD_IP[world_id-1]
url = dmm.GET_FLASH_URL % (world_ip, self.owner, int(time.time()*1000))
body = urlencode({'url': url,
'httpMethod': 'GET',
'authz': 'signed',
'st': st,
'contentType': 'JSON',
'numEntries': '3',
'getSummaries': 'false',
'signOwner': 'true',
'signViewer': 'true',
'gadget': 'http://203.104.209.7/gadget.xml',
'container': 'dmm'})
try:
req = yield self.http_client.fetch(dmm.MAKE_REQUEST_URL, method='POST', headers=self.headers, body=body,
connect_timeout=self.connect_timeout,
request_timeout=self.request_timeout,
proxy_host=proxy_host, proxy_port=proxy_port)
except (CurlError, HTTPError):
raise OoiAuthError('连接api_token服务器失败')
svdata = json_decode(native_str(req.body)[27:])
if svdata[url]['rc'] != 200:
raise OoiAuthError('获取api_token失败')
svdata = json_decode(svdata[url]['body'][7:])
if svdata['api_result'] != 1:
raise OoiAuthError('获取api_token失败')
return world_ip, svdata['api_token'], svdata['api_starttime']
开发者ID:ZaoRiTian,项目名称:ooi2,代码行数:28,代码来源:kancolle.py
注:本文中的tornado.escape.native_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论