本文整理汇总了Python中werkzeug.wsgi.wrap_file函数的典型用法代码示例。如果您正苦于以下问题:Python wrap_file函数的具体用法?Python wrap_file怎么用?Python wrap_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wrap_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_range_wrapper
def test_range_wrapper():
response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert next(range_wrapper) == b"Worl"
response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 1, 0)
with pytest.raises(StopIteration):
next(range_wrapper)
response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 6, 100)
assert next(range_wrapper) == b"World"
response = BaseResponse((x for x in (b"He", b"ll", b"o ", b"Wo", b"rl", b"d")))
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert not range_wrapper.seekable
assert next(range_wrapper) == b"Wo"
assert next(range_wrapper) == b"rl"
response = BaseResponse((x for x in (b"He", b"ll", b"o W", b"o", b"rld")))
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert next(range_wrapper) == b"W"
assert next(range_wrapper) == b"o"
assert next(range_wrapper) == b"rl"
with pytest.raises(StopIteration):
next(range_wrapper)
response = BaseResponse((x for x in (b"Hello", b" World")))
range_wrapper = _RangeWrapper(response.response, 1, 1)
assert next(range_wrapper) == b"e"
with pytest.raises(StopIteration):
next(range_wrapper)
resources = os.path.join(os.path.dirname(__file__), "res")
env = create_environ()
with open(os.path.join(resources, "test.txt"), "rb") as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 1, 2)
assert range_wrapper.seekable
assert next(range_wrapper) == b"OU"
with pytest.raises(StopIteration):
next(range_wrapper)
with open(os.path.join(resources, "test.txt"), "rb") as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 2)
assert next(range_wrapper) == b"UND\n"
with pytest.raises(StopIteration):
next(range_wrapper)
开发者ID:pallets,项目名称:werkzeug,代码行数:50,代码来源:test_wsgi.py
示例2: __call__
def __call__(self, environ, start_response):
if self.context is None:
self.context = self.site.build()
for input_file in self.context.input_files:
self.extra_files.append(
os.path.join(self.site.source_path, input_file))
cleaned_path = get_path_info(environ)
cleaned_path = cleaned_path.lstrip('/')
cleaned_path = '/' + cleaned_path
static_file_path = None
if cleaned_path.lstrip('/') in self.context.output_files:
static_file_path = cleaned_path
elif cleaned_path.endswith('/'):
try_cleaned_path = cleaned_path + 'index.html'
if try_cleaned_path.lstrip('/') in self.context.output_files:
static_file_path = try_cleaned_path
if static_file_path is None:
return self.application(environ, start_response)
real_path = os.path.join(self.context.site.build_path,
static_file_path.lstrip('/'))
guessed_type = mimetypes.guess_type(real_path)
mime_type = guessed_type[0] or 'text/plain'
file_size = int(os.path.getsize(real_path))
headers = [
('Content-Type', mime_type),
('Content-Length', str(file_size)),
]
start_response('200 OK', headers)
return wrap_file(environ, open(real_path, 'rb'))
开发者ID:RichardOfWard,项目名称:discharge,代码行数:35,代码来源:server.py
示例3: build_response
def build_response(request, pdf_file):
response = Response(wrap_file(request.environ, pdf_file))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('content-type', 'application/pdf')
return response
开发者ID:twocngdagz,项目名称:printer-webservice,代码行数:7,代码来源:app.py
示例4: view_file
def view_file(req):
if "uploaded_file" not in req.files:
return BaseResponse("no file uploaded")
f = req.files["uploaded_file"]
return BaseResponse(
wrap_file(req.environ, f), mimetype=f.content_type, direct_passthrough=True
)
开发者ID:pallets,项目名称:werkzeug,代码行数:7,代码来源:upload.py
示例5: serve_static_files
def serve_static_files(request):
"""Serve a static file."""
# First, match the path against the regex.
matcher = re.match(url_re, request.path)
if not matcher: # Just for safety - the dispatcher should have matched this
logging.error('Static file handler found no match for %s',
request.path)
return wrappers.Response(status=httplib.NOT_FOUND)
# Use the match and the files regex backref to choose a filename.
filename = matcher.expand(files_mapping)
# Check to see if the normalized path matched is in the upload regex.
# This provides path traversal protection, although apps running on Google
# servers are protected by the Google frontend (GFE)'s own path traversal
# protection as well.
if not re.match(upload_re, os.path.normpath(filename)):
logging.warn('Requested filename %s not in `upload`', filename)
return wrappers.Response(status=httplib.NOT_FOUND)
try:
fp = open(filename, 'rb')
# fp is not closed in this function as it is handed to the WSGI server
# directly.
except IOError:
logging.warn('Requested non-existent filename %s', filename)
return wrappers.Response(status=httplib.NOT_FOUND)
wrapped_file = wsgi.wrap_file(request.environ, fp)
return wrappers.Response(
wrapped_file, direct_passthrough=True,
mimetype=mime_type or mimetypes.guess_type(filename)[0])
开发者ID:vlad-lifliand,项目名称:appengine-python-vm-runtime,代码行数:32,代码来源:static_files.py
示例6: get
def get(self, digest):
"""Fill the local Response to serve the requested file.
Set the fields of the Response object saved in the greenlet-
local storage to make it then serve the file identified by the
given digest when called as a WSGI application.
digest (bytes): the digest of the file we want to retrieve.
raise: NotFound if the cacher cannot provide the file.
"""
try:
fobj = self._file_cacher.get_file(digest)
except KeyError:
raise NotFound()
local.response.status_code = 200
# XXX We could use get_size to determine Content-Length.
if "filename" in local.request.args:
local.response.headers.add_header(
b'Content-Disposition', b'attachment',
filename=local.request.args["filename"])
# FIXME Determine from filename (if given) or file contents.
local.response.mimetype = 'application/octet-stream'
local.response.response = wrap_file(local.request.environ, fobj)
local.response.direct_passthrough = True
开发者ID:acube-unipi,项目名称:oii-web,代码行数:27,代码来源:file_api.py
示例7: send_private_file
def send_private_file(path):
path = os.path.join(frappe.local.conf.get(
'private_path', 'private'), path.strip("/"))
filename = os.path.basename(path)
if frappe.local.request.headers.get('X-Use-X-Accel-Redirect'):
path = '/protected/' + path
response = Response()
response.headers[b'X-Accel-Redirect'] = frappe.utils.encode(path)
else:
filepath = frappe.utils.get_site_path(path)
try:
f = open(filepath, 'rb')
except IOError:
raise NotFound
response = Response(
wrap_file(frappe.local.request.environ, f), direct_passthrough=True)
# no need for content disposition and force download. let browser handle its opening.
# response.headers.add(b'Content-Disposition', b'attachment', filename=filename.encode("utf-8"))
response.mimetype = mimetypes.guess_type(
filename)[0] or b'application/octet-stream'
return response
开发者ID:vhrspvl,项目名称:vhrs-frappe,代码行数:27,代码来源:response.py
示例8: dbfile_handler
def dbfile_handler(self, environ, args):
try:
fobj = self.file_cacher.get_file(args['digest'])
except KeyError:
raise NotFound()
response = Response()
response.status_code = 200
response.mimetype = 'application/octet-stream'
if 'name' in args:
if args["name"].endswith(".pdf"):
# Add header to allow the official pdf.js to work
response.headers.add_header(b'Access-Control-Allow-Origin',
b'https://mozilla.github.io')
else:
# Don't do this on pdf files because it breaks the native pdf reader
response.headers.add_header(
b'Content-Disposition', b'attachment',
filename=args['name'])
mimetype = mimetypes.guess_type(args['name'])[0]
if mimetype is not None:
response.mimetype = mimetype
response.response = wrap_file(environ, fobj)
response.direct_passthrough = True
response.cache_control.max_age = 31536000
response.cache_control.public = True
return response
开发者ID:algorithm-ninja,项目名称:cmsocial,代码行数:29,代码来源:pws.py
示例9: send_file
def send_file(fileobj, version=-1, mimetype=None, filename=None, cache_for=31536000):
"""
#pour fs.get_version(filename=filename, version=version)
version=-1,
#number of seconds
cache_for=31536000
#date de création ?
fileobj.upload_date
"""
# print("filename: ", fileobj.filename)
# print("metadata : ", fileobj.metadata)
filename = filename or fileobj.filename
data = wrap_file(request.environ, fileobj, buffer_size=1024 * 256)
response = current_app.response_class(data, mimetype=mimetype or fileobj.content_type, direct_passthrough=True)
response.headers["Content-disposition"] = "attachment; filename=%s" % filename
response.content_length = fileobj.length
response.last_modified = fileobj.upload_date
response.set_etag(fileobj.md5)
"""
response.cache_control.max_age = cache_for
response.cache_control.s_max_age = cache_for
response.cache_control.public = True
"""
response.make_conditional(request)
return response
开发者ID:mmalter,项目名称:widukind-web,代码行数:30,代码来源:download.py
示例10: send_from_memory
def send_from_memory(filename):
"""
:param filename: Name of the file to be loaded.
"""
if not os.path.isfile(filename):
raise NotFound()
#if filename is not None:
#if not os.path.isabs(filename):
#filename = os.path.join(current_app.root_path, filename)
mimetype = mimetypes.guess_type(filename)[0]
if mimetype is None:
mimetype = 'application/octet-stream'
if current_app.config['cache_enabled']:
data = jsOptimizer().get_file(os.path.abspath(filename), current_app.storekv)
else:
data = None
if data:
headers = Headers()
headers['Content-Encoding'] = 'gzip'
headers['Content-Length'] = len(data)
headers['Cache-Control'] = "max-age=172800, public, must-revalidate"
rv = current_app.response_class(data, mimetype=mimetype, headers=headers,
direct_passthrough=True)
else:
file = open(filename, 'rb')
data = wrap_file(request.environ, file)
headers = Headers()
rv = current_app.response_class(data, mimetype=mimetype, headers=headers,
direct_passthrough=False)
return rv
开发者ID:CaliopeProject,项目名称:CaliopeServer,代码行数:33,代码来源:fileUtils.py
示例11: server
def server(filename):
print filename
storage_setup = current_app.config['storage']
if 'local' in storage_setup and 'absolut_path' in storage_setup['local']:
STORAGE = storage_setup['local']['absolut_path']
else:
STORAGE = '.'
filename = filename.split('.')[0]
node = CaliopeDocument.pull(filename)
file = open(os.path.join(STORAGE, filename), 'rb')
data = wrap_file(request.environ, file)
headers = Headers()
try:
mimetype = node.mimetype
except:
mimetype = 'application/octet-stream'
rv = current_app.response_class(data, mimetype=mimetype, headers=headers,
direct_passthrough=False)
return rv
开发者ID:CaliopeProject,项目名称:CaliopeServer,代码行数:26,代码来源:server.py
示例12: serve_static_files
def serve_static_files(request):
"""Serve a static file."""
# First, match the path against the regex...
matcher = re.match(regex, request.path)
# ... and use the files regex backref to choose a filename.
filename = matcher.expand(files)
# Rudimentary protection against path traversal outside of the app. This
# code should never be hit in production, as Google's frontend servers
# (GFE) rewrite traversal attempts and respond with a 302 immediately.
filename = os.path.abspath(filename)
if not filename.startswith(os.path.join(os.getcwd(), '')):
logging.warn('Path traversal protection triggered for %s, '
'returning 404', filename)
return Response(status=404)
try:
fp = open(filename, 'rb')
# fp is not closed in this function as it is handed to the WSGI server
# directly.
except IOError:
logging.warn('Requested non-existent filename %s', filename)
return Response(status=404)
wrapped_file = wrap_file(request.environ, fp)
return Response(wrapped_file, direct_passthrough=True,
mimetype=mime_type or mimetypes.guess_type(filename)[0])
开发者ID:andrewsg,项目名称:appengine-python-vm-runtime,代码行数:27,代码来源:static_files.py
示例13: dispatch_request
def dispatch_request(self, environ, request):
filename = request.path if request.path != "/" else "index.html"
try:
fp = open('app/%s' % filename, 'rb')
except IOError:
raise NotFound()
return Response(wrap_file(environ, fp), mimetype='text/html')
开发者ID:reinbach,项目名称:finance,代码行数:7,代码来源:runserver.py
示例14: get
def get(self, dbid, sessid, restype, resid=None):
if resid == None:
return "You want alist of %s/%s/%s" % (dbid, sessid, restype)
else:
datadb = self.get_data_db(dbid)
if datadb == None:
return Response("{ \"error \" : \"Invalid database id %s\"}" % (dbid), status=405)
conn.register([Session])
sessobj = datadb["sessions"].Session.find_one({"_id" : ObjectId(sessid)})
if sessobj == None:
return Response("{ \"error \" : \"Session %s does not exist in db %s\"}" % (sessid, dbid), status=405)
# TODO: Make sure that resid exists in this session before being obtained from gridfs
if restype == "attachments" or restype == "rawfiles":
gf = gridfs.GridFS(datadb , restype)
fileobj = gf.get(ObjectId(resid))
data = wrap_file(request.environ, fileobj)
response = current_app.response_class(
data,
mimetype=fileobj.content_type,
direct_passthrough=True)
response.content_length = fileobj.length
response.last_modified = fileobj.upload_date
response.set_etag(fileobj.md5)
response.cache_control.max_age = 0
response.cache_control.s_max_age = 0
response.cache_control.public = True
response.headers['Content-Disposition'] = 'attachment; filename=' + fileobj.filename
response.make_conditional(request)
return response
else:
return "You want %s from views in %s/%s" % (resid, dbid, sessid)
开发者ID:dhandeo,项目名称:SlideAtlas-Server,代码行数:35,代码来源:api.py
示例15: logo_login
def logo_login(self, **post):
p = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
number_rnd = random.randint(1, 15)
p = os.path.join(p, 'static/src/img/fundo_{0}.jpg'.format(number_rnd))
image = open(p, 'rb')
return BaseResponse(wrap_file(request.httprequest.environ, image),
mimetype='image/png')
开发者ID:hgsoft,项目名称:hgsoft-addons,代码行数:7,代码来源:main.py
示例16: get_file
def get_file(self, environ, request):
try:
file = open('images/'+request.query_string.decode(), 'rb')
except Exception as e:
debug_logger.debug("could not read file" + str(e))
raise NotFound()
return Response(wrap_file(environ, file), direct_passthrough=True)
开发者ID:brittanynall,项目名称:SpeechDevice,代码行数:7,代码来源:RpcService.py
示例17: craete_object
def craete_object(request):
driver = get_driver_instance(providers, request)
data = {"container_name": request.args["container"]}
container = entries.ContainerEntry._get_object(data, driver)
extra = {"content_type": request.content_type}
result = driver.upload_object_via_stream(
wrap_file(request.environ, request.stream, 8096), container, request.args["object_name"], extra
)
return Response(entries.ObjectEntry.to_json(result), status=httplib.OK)
开发者ID:reidwooten99,项目名称:libcloud.rest,代码行数:9,代码来源:storage.py
示例18: attachments
def attachments():
"""
Interface for managing attachments
first step is to download attachments
Just knowing attachid is not enough. It should belong to the session one has access to
- /tile/4e695114587718175c000006/t.jpg searches and returns the image
"""
# Get variables
db = request.args.get('sessdb', None)
attachid = request.args.get('attachid', None)
sessid = request.args.get('sessid', None)
cmd = request.args.get('cmd', "get")
if cmd == "get":
if(db == None or attachid == None or sessid == None):
flash('sessdb, attachid and sessid must all be set', "error")
return redirect('/home')
pass
# TODO: Can we store this information in the session information (or a database information)
conn.register([model.Database])
admindb = conn[current_app.config["CONFIGDB"]]
try:
dbid = ObjectId(db)
except:
flash('dbid is not a valid id', "error")
return redirect('/home')
dbobj = admindb["databases"].find_one({"_id" : dbid})
db = conn[dbobj['dbname']]
if not model.VerifySessionAccess(model.SEE_SESSION, db, sessid):
flash('Forbidden Access ', "error")
return redirect('/home')
# try:
gf = gridfs.GridFS(db , "attachments")
fileobj = gf.get(ObjectId(attachid))
# except:
# flash('Error locating file', "error")
# return redirect('/home')
# mostly copied from flask/helpers.py, with
# modifications for GridFS
data = wrap_file(request.environ, fileobj)
response = current_app.response_class(
data,
mimetype=fileobj.content_type,
direct_passthrough=True)
response.content_length = fileobj.length
response.last_modified = fileobj.upload_date
response.set_etag(fileobj.md5)
response.cache_control.max_age = 0
response.cache_control.s_max_age = 0
response.cache_control.public = True
response.headers['Content-Disposition'] = 'attachment; filename=' + fileobj.filename
response.make_conditional(request)
return response
开发者ID:charles-marion,项目名称:SlideAtlas-Server,代码行数:57,代码来源:attachments.py
示例19: get_static_file_response
def get_static_file_response():
try:
f = open(frappe.flags.file_path, 'rb')
except IOError:
raise NotFound
response = Response(wrap_file(frappe.local.request.environ, f), direct_passthrough=True)
response.mimetype = mimetypes.guess_type(frappe.flags.file_path)[0] or b'application/octet-stream'
return response
开发者ID:kickapoo,项目名称:frappe,代码行数:9,代码来源:render.py
示例20: send_file_csv
def send_file_csv(fileobj, mimetype=None, content_length=0):
data = wrap_file(request.environ, fileobj, buffer_size=1024*256)
response = current_app.response_class(
data,
mimetype=mimetype,
)
response.status_code = 200
response.make_conditional(request)
return response
开发者ID:srault95,项目名称:widukind-web,代码行数:10,代码来源:views.py
注:本文中的werkzeug.wsgi.wrap_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论