本文整理汇总了Python中werkzeug.formparser.parse_form_data函数的典型用法代码示例。如果您正苦于以下问题:Python parse_form_data函数的具体用法?Python parse_form_data怎么用?Python parse_form_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_form_data函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: generate_formdata
def generate_formdata(req, resp, params):
form = dict()
files = dict()
if req.method == 'GET':
di = parse_query_string(req.query_string)
form = dict(di)
params['form'], params['files'] = dict(form), dict(files)
else:
if 'json' in req.get_header('content-type', None):
#if the method type is post "form" variable below can contain
#only either the post body or the quer parameter at once, modify
#umcomment the below commented line to store query parameters in "form"
form = json.load(req.stream)
#form = dict(parse_query_string(req.query_string))
params['form'], params['files'] = dict(form), dict(files)
else:
mimetype, options = parse_options_header(req.get_header('content-type'))
data = req.stream.read()
environ = {'wsgi.input':StringIO(data),
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': req.get_header('content-type'),
'REQUEST_METHOD': 'POST'}
stream, tempform, tempfiles = parse_form_data(environ)
for item in tempform:
form[item] = tempform[item]
di = parse_query_string(req.query_string)
for item in di:
form[item] = di[item]
for item in tempfiles:
files[item] = tempfiles[item]
params['form'], params['files'] = dict(form), dict(files)
#print form
#print params
return True
开发者ID:ashishRay12,项目名称:productServer,代码行数:34,代码来源:common.py
示例2: decompose_incoming_envelope
def decompose_incoming_envelope(self, ctx):
ctx.method_request_string = '{%s}%s' % (self.app.interface.get_tns(),
ctx.in_document['PATH_INFO'].split('/')[-1])
logger.debug("\033[92mMethod name: %r\033[0m" % ctx.method_request_string)
ctx.in_header_doc = _get_http_headers(ctx.in_document)
ctx.in_body_doc = parse_qs(ctx.in_document['QUERY_STRING'])
if ctx.transport.req_env['REQUEST_METHOD'].lower() in ('post', 'put', 'patch'):
stream, form, files = parse_form_data(ctx.transport.req_env)
for k, v in form.lists():
val = ctx.in_body_doc.get(k, [])
val.extend(v)
ctx.in_body_doc[k] = val
for k, v in files.items():
val = ctx.in_body_doc.get(k, [])
val.append(yield_stream(v.stream))
ctx.in_body_doc[k] = val
# FIXME: some proper variable matching is needed here.
k2 = k + "_name"
val = ctx.in_body_doc.get(k2, [])
val.append(v.filename)
ctx.in_body_doc[k2] = val
k2 = k + "_type"
val = ctx.in_body_doc.get(k2, [])
val.append(v.headers.get('Content-Type','application/octet-stream'))
ctx.in_body_doc[k2] = val
logger.debug('\theader : %r' % (ctx.in_header_doc))
logger.debug('\tbody : %r' % (ctx.in_body_doc))
开发者ID:azurit,项目名称:rpclib,代码行数:35,代码来源:http.py
示例3: decompose_incoming_envelope
def decompose_incoming_envelope(self, prot, ctx, message):
"""This function is only called by the HttpRpc protocol to have the wsgi
environment parsed into ``ctx.in_body_doc`` and ``ctx.in_header_doc``.
"""
if self.has_patterns:
from werkzeug.exceptions import NotFound
if self._map_adapter is None:
self.generate_map_adapter(ctx)
try:
#If PATH_INFO matches a url, Set method_request_string to mrs
mrs, params = self._map_adapter.match(ctx.in_document["PATH_INFO"],
ctx.in_document["REQUEST_METHOD"])
ctx.method_request_string = mrs
except NotFound:
# Else set method_request_string normally
params = {}
ctx.method_request_string = '{%s}%s' % (prot.app.interface.get_tns(),
ctx.in_document['PATH_INFO'].split('/')[-1])
else:
params = {}
ctx.method_request_string = '{%s}%s' % (prot.app.interface.get_tns(),
ctx.in_document['PATH_INFO'].split('/')[-1])
logger.debug("%sMethod name: %r%s" % (LIGHT_GREEN,
ctx.method_request_string, END_COLOR))
ctx.in_header_doc = _get_http_headers(ctx.in_document)
ctx.in_body_doc = parse_qs(ctx.in_document['QUERY_STRING'])
for k,v in params.items():
if k in ctx.in_body_doc:
ctx.in_body_doc[k].append(v)
else:
ctx.in_body_doc[k] = [v]
if ctx.in_document['REQUEST_METHOD'].upper() in ('POST', 'PUT', 'PATCH'):
stream, form, files = parse_form_data(ctx.in_document,
stream_factory=prot.stream_factory)
for k, v in form.lists():
val = ctx.in_body_doc.get(k, [])
val.extend(v)
ctx.in_body_doc[k] = val
for k, v in files.items():
val = ctx.in_body_doc.get(k, [])
mime_type = v.headers.get('Content-Type', 'application/octet-stream')
path = getattr(v.stream, 'name', None)
if path is None:
val.append(File.Value(name=v.filename, type=mime_type,
data=[v.stream.getvalue()]))
else:
v.stream.seek(0)
val.append(File.Value(name=v.filename, type=mime_type,
path=path, handle=v.stream))
ctx.in_body_doc[k] = val
开发者ID:listestalo,项目名称:spyne,代码行数:60,代码来源:wsgi.py
示例4: decompose_incoming_envelope
def decompose_incoming_envelope(self, ctx):
ctx.method_request_string = '{%s}%s' % (self.app.interface.get_tns(),
ctx.in_document['PATH_INFO'].split('/')[-1])
logger.debug("\033[92mMethod name: %r\033[0m" % ctx.method_request_string)
ctx.in_header_doc = _get_http_headers(ctx.in_document)
ctx.in_body_doc = parse_qs(ctx.in_document['QUERY_STRING'])
if ctx.transport.req_env['REQUEST_METHOD'].lower() in ('post', 'put', 'patch'):
stream, form, files = parse_form_data(ctx.transport.req_env,
stream_factory=self.stream_factory)
for k, v in form.lists():
val = ctx.in_body_doc.get(k, [])
val.extend(v)
ctx.in_body_doc[k] = val
for k, v in files.items():
val = ctx.in_body_doc.get(k, [])
mime_type = v.headers.get('Content-Type', 'application/octet-stream')
path = getattr(v.stream, 'name', None)
if path is None:
val.append(File(name=v.filename, type=mime_type, data=[v.stream.getvalue()]))
else:
v.stream.seek(0)
val.append(File(name=v.filename, type=mime_type, path=path, handle=v.stream))
ctx.in_body_doc[k] = val
logger.debug('\theader : %r' % (ctx.in_header_doc))
logger.debug('\tbody : %r' % (ctx.in_body_doc))
开发者ID:mardiros,项目名称:rpclib,代码行数:34,代码来源:http.py
示例5: _load_form_data
def _load_form_data(self):
if "stream" in self.__dict__:
return
if self.shallow:
raise RuntimeError(
"A shallow request tried to consume form data. If you really want to do that, set `shallow` to False."
)
data = None
stream = _empty_stream
if self.environ["REQUEST_METHOD"] in ("POST", "PUT"):
try:
data = parse_form_data(
self.environ,
self._get_file_stream,
self.charset,
self.encoding_errors,
self.max_form_memory_size,
self.max_content_length,
cls=self.parameter_storage_class,
silent=False,
)
except ValueError as e:
self._form_parsing_failed(e)
else:
content_length = self.headers.get("content-length", type=int)
if content_length is not None:
stream = LimitedStream(self.environ["wsgi.input"], content_length)
if data is None:
data = (stream, self.parameter_storage_class(), self.parameter_storage_class())
d = self.__dict__
d["stream"], d["form"], d["files"] = data
开发者ID:Reve,项目名称:eve,代码行数:32,代码来源:wrappers.py
示例6: test_parse_form_data_get_without_content
def test_parse_form_data_get_without_content(self):
env = create_environ('/foo', 'http://example.org/', method='GET')
stream, form, files = formparser.parse_form_data(env)
strict_eq(stream.read(), b'')
strict_eq(len(form), 0)
strict_eq(len(files), 0)
开发者ID:brunoais,项目名称:werkzeug,代码行数:7,代码来源:test_formparser.py
示例7: _is_single_sign_out
def _is_single_sign_out(self, environ):
logging.debug("Testing for SLO")
if environ['REQUEST_METHOD'] == 'POST':
current_url = environ.get('PATH_INFO','')
origin = self._entry_page
logging.debug("Testing for SLO:" + current_url + " vs " + origin)
if current_url == origin:
try:
form = parse_form_data(environ)[1]
request_body = form['logoutRequest']
request_body = unquote_plus(request_body).decode('utf8')
logging.debug("POST:" + str(request_body))
logging.debug("POST:" + str(environ))
dom = xml.dom.minidom.parseString(request_body)
nodes = dom.getElementsByTagNameNS(self.samlpNamespaceUri, 'SessionIndex')
if nodes:
sessionNode = nodes[0]
if sessionNode.firstChild is not None:
sessionId = sessionNode.firstChild.nodeValue
logging.info("Received SLO request for:" + sessionId)
self._remove_session_by_ticket(sessionId)
return True
except (Exception):
logging.warning("Exception parsing post")
logging.exception("Exception parsing post:" + request_body)
return False
开发者ID:cggh,项目名称:DQXServer,代码行数:26,代码来源:casmiddleware.py
示例8: __call__
def __call__(self, environ, start_response):
request = Request(environ)
response = Response('')
self._get_session(request)
request_method = environ['REQUEST_METHOD']
form = parse_form_data(environ)[1]
path = environ.get('PATH_INFO','')
request_url = request.url
params = request.args
resp = self._process_request(request_method, request_url, path, params, form)
logging.debug(str(resp))
if resp:
if 'set_values' in resp:
self._set_values(environ)
return self._application(environ, start_response)
if 'ignore_callback' in resp and resp['ignore_callback'] == True:
return self._ignored_callback(environ, start_response)
if 'status' in resp:
response.status = resp['status']
for name in ['Location', 'Content-Type', 'WWW-Authenticate']:
if name in resp['headers']:
response.headers[name] = resp['headers'][name]
if 'data' in resp:
response.data = resp['data']
response.set_cookie(self.CAS_COOKIE_NAME, value = self._session.sid, max_age = None, expires = None)
return response(environ, start_response)
else:
return self._application(environ, start_response)
开发者ID:wrighting,项目名称:cas_wsgi_middleware,代码行数:29,代码来源:werkzeugcas.py
示例9: generate_formdata
def generate_formdata(req, resp, params):
"""sets prarams['form'] to pass to every endpoint.
"""
#print "here"
form = dict()
files = dict()
if req.method == 'GET':
di = parse_query_string(req.query_string)
form = dict(di)
params['form'], params['files'] = dict(form), dict(files)
else:
if 'json' in req.get_header('content-type', None):
form = json.load(req.stream)
params['form'], params['files'] = dict(form), dict(files)
else:
mimetype, options = parse_options_header(req.get_header('content-type'))
data = req.stream.read()
environ = {'wsgi.input': StringIO(data),
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': req.get_header('content-type'),
'REQUEST_METHOD': 'POST'}
stream, tempform, tempfiles = parse_form_data(environ)
for item in tempform:
form[item] = tempform[item]
di = parse_query_string(req.query_string)
for item in di:
form[item] = di[item]
for item in tempfiles:
files[item] = tempfiles[item]
params['form'], params['files'] = dict(form), dict(files)
return True
开发者ID:pyvkd,项目名称:turbulent-octo,代码行数:31,代码来源:helpers.py
示例10: test_parse_form_data_get_without_content
def test_parse_form_data_get_without_content(self):
env = create_environ("/foo", "http://example.org/", method="GET")
del env["CONTENT_TYPE"]
del env["CONTENT_LENGTH"]
stream, form, files = formparser.parse_form_data(env)
self.assert_equal(stream.read(), "")
self.assert_equal(len(form), 0)
self.assert_equal(len(files), 0)
开发者ID:justushamalainen,项目名称:werkzeug,代码行数:9,代码来源:formparser.py
示例11: test_parse_form_data_get_without_content
def test_parse_form_data_get_without_content(self):
env = create_environ('/foo', 'http://example.org/', method='GET')
del env['CONTENT_TYPE']
del env['CONTENT_LENGTH']
stream, form, files = formparser.parse_form_data(env)
self.assert_equal(stream.read(), '')
self.assert_equal(len(form), 0)
self.assert_equal(len(files), 0)
开发者ID:y2bishop2y,项目名称:microengine,代码行数:9,代码来源:formparser.py
示例12: test_parse_form_data_get_without_content
def test_parse_form_data_get_without_content():
"""GET requests without data, content type and length returns no data"""
env = create_environ('/foo', 'http://example.org/', method='GET')
del env['CONTENT_TYPE']
del env['CONTENT_LENGTH']
stream, form, files = parse_form_data(env)
assert stream.read() == ""
assert len(form) == 0
assert len(files) == 0
开发者ID:EnTeQuAk,项目名称:werkzeug,代码行数:10,代码来源:test_formparser.py
示例13: parse
def parse(self, context):
"""Parse the request content
"""
try:
stream, form, files = parse_form_data(context.request.environ)
return FormData(form, stream, files)
except:
# Failed to decode the content
self.logger.error('Failed to decode request content')
# Raise
raise BadRequestError
开发者ID:penfree,项目名称:pyunified-rpc,代码行数:11,代码来源:form.py
示例14: test_environ_builder_stream_switch
def test_environ_builder_stream_switch(self):
d = MultiDict(dict(foo=u'bar', blub=u'blah', hu=u'hum'))
for use_tempfile in False, True:
stream, length, boundary = stream_encode_multipart(
d, use_tempfile, threshold=150)
assert isinstance(stream, OutputType) != use_tempfile
form = parse_form_data({'wsgi.input': stream, 'CONTENT_LENGTH': str(length),
'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' %
boundary})[1]
assert form == d
开发者ID:0xJCG,项目名称:dubtrack-technical-test,代码行数:11,代码来源:test.py
示例15: test_empty_multipart
def test_empty_multipart(self):
environ = {}
data = b'--boundary--'
environ['REQUEST_METHOD'] = 'POST'
environ['CONTENT_TYPE'] = 'multipart/form-data; boundary=boundary'
environ['CONTENT_LENGTH'] = str(len(data))
environ['wsgi.input'] = BytesIO(data)
stream, form, files = parse_form_data(environ, silent=False)
rv = stream.read()
assert rv == b''
assert form == MultiDict()
assert files == MultiDict()
开发者ID:2009bpy,项目名称:werkzeug,代码行数:12,代码来源:test_formparser.py
示例16: __call__
def __call__(self, environ, start_response):
if environ['REQUEST_METHOD'].upper() == 'POST':
environ['wsgi.input'] = stream = \
BytesIO(get_input_stream(environ).read())
formdata = parse_form_data(environ)[1]
stream.seek(0)
method = formdata.get('_method', '').upper()
if method in ('GET', 'POST', 'PUT', 'DELETE'):
environ['REQUEST_METHOD'] = method
return self.app(environ, start_response)
开发者ID:jazzido,项目名称:dondevoto,代码行数:12,代码来源:dondevoto.py
示例17: test_environ_builder_stream_switch
def test_environ_builder_stream_switch(self):
d = MultiDict(dict(foo=u'bar', blub=u'blah', hu=u'hum'))
for use_tempfile in False, True:
stream, length, boundary = stream_encode_multipart(
d, use_tempfile, threshold=150)
self.assert_true(isinstance(stream, BytesIO) != use_tempfile)
form = parse_form_data({'wsgi.input': stream, 'CONTENT_LENGTH': str(length),
'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' %
boundary})[1]
self.assert_strict_equal(form, d)
stream.close()
开发者ID:poffdeluxe,项目名称:werkzeug,代码行数:12,代码来源:test.py
示例18: test_parse_form_data_put_without_content
def test_parse_form_data_put_without_content(self):
# A PUT without a Content-Type header returns empty data
# Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message
# containing an entity-body SHOULD include a Content-Type header field
# defining the media type of that body." In the case where either
# headers are omitted, parse_form_data should still work.
env = create_environ('/foo', 'http://example.org/', method='PUT')
stream, form, files = formparser.parse_form_data(env)
strict_eq(stream.read(), b'')
strict_eq(len(form), 0)
strict_eq(len(files), 0)
开发者ID:brunoais,项目名称:werkzeug,代码行数:13,代码来源:test_formparser.py
示例19: test_parse_form_data_put_without_content
def test_parse_form_data_put_without_content(self):
# A PUT without a Content-Type header returns empty data
# Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message
# containing an entity-body SHOULD include a Content-Type header field
# defining the media type of that body." In the case where either
# headers are omitted, parse_form_data should still work.
env = create_environ("/foo", "http://example.org/", method="PUT")
del env["CONTENT_TYPE"]
del env["CONTENT_LENGTH"]
stream, form, files = formparser.parse_form_data(env)
self.assert_equal(stream.read(), "")
self.assert_equal(len(form), 0)
self.assert_equal(len(files), 0)
开发者ID:justushamalainen,项目名称:werkzeug,代码行数:15,代码来源:formparser.py
示例20: test_environ_builder_stream_switch
def test_environ_builder_stream_switch():
d = MultiDict(dict(foo=u"bar", blub=u"blah", hu=u"hum"))
for use_tempfile in False, True:
stream, length, boundary = stream_encode_multipart(d, use_tempfile, threshold=150)
assert isinstance(stream, BytesIO) != use_tempfile
form = parse_form_data(
{
"wsgi.input": stream,
"CONTENT_LENGTH": str(length),
"CONTENT_TYPE": 'multipart/form-data; boundary="%s"' % boundary,
}
)[1]
strict_eq(form, d)
stream.close()
开发者ID:ajones620,项目名称:werkzeug,代码行数:15,代码来源:test_test.py
注:本文中的werkzeug.formparser.parse_form_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论