本文整理汇总了Python中werkzeug.datastructures.Headers类的典型用法代码示例。如果您正苦于以下问题:Python Headers类的具体用法?Python Headers怎么用?Python Headers使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Headers类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: iter_request_header_errors
def iter_request_header_errors(self, request_headers):
"""
Validates individual request headers against the schema, yielding
a :class:`~spectastic.errors.FieldError` for each failure.
"""
header_schemas = self.header_schemas()
if len(header_schemas) == 0:
return
request_headers = Headers(request_headers)
for header, value in request_headers.iteritems():
if header in header_schemas:
schema = self.header_schema(header)
for error in self.validator.iter_errors(value, schema):
yield FieldError(error.message, 'header', header)
for name, schema in header_schemas.iteritems():
if schema.get('required') and name.lower() \
not in request_headers:
yield FieldError(
'Required header is missing',
'header',
schema.get('name')
)
开发者ID:planetlabs,项目名称:spectastic,代码行数:25,代码来源:operation.py
示例2: handle_error
def handle_error(self, e):
'''
Error handler for the API transforms a raised exception into a Flask response,
with the appropriate HTTP status code and body.
:param Exception e: the raised Exception object
'''
got_request_exception.send(current_app._get_current_object(), exception=e)
headers = Headers()
if e.__class__ in self.error_handlers:
handler = self.error_handlers[e.__class__]
result = handler(e)
default_data, code, headers = unpack(result, 500)
elif isinstance(e, HTTPException):
code = e.code
default_data = {
'message': getattr(e, 'description', HTTP_STATUS_CODES.get(code, ''))
}
headers = e.get_response().headers
elif self._default_error_handler:
result = self._default_error_handler(e)
default_data, code, headers = unpack(result, 500)
else:
code = 500
default_data = {
'message': HTTP_STATUS_CODES.get(code, str(e)),
}
default_data['message'] = default_data.get('message', str(e))
data = getattr(e, 'data', default_data)
fallback_mediatype = None
if code >= 500:
exc_info = sys.exc_info()
if exc_info[1] is None:
exc_info = None
current_app.log_exception(exc_info)
elif code == 404 and current_app.config.get("ERROR_404_HELP", True):
data['message'] = self._help_on_404(data.get('message', None))
elif code == 406 and self.default_mediatype is None:
# if we are handling NotAcceptable (406), make sure that
# make_response uses a representation we support as the
# default mediatype (so that make_response doesn't throw
# another NotAcceptable error).
supported_mediatypes = list(self.representations.keys())
fallback_mediatype = supported_mediatypes[0] if supported_mediatypes else "text/plain"
# Remove blacklisted headers
for header in HEADERS_BLACKLIST:
headers.pop(header, None)
resp = self.make_response(data, code, headers, fallback_mediatype=fallback_mediatype)
if code == 401:
resp = self.unauthorized(resp)
return resp
开发者ID:Dlotan,项目名称:flask-restplus,代码行数:60,代码来源:api.py
示例3: credentials
def credentials(scope="module"):
"""
Note that these credentials match those mentioned in test.htpasswd
"""
h = Headers()
h.add("Authorization", "Basic " + base64.b64encode("username:password"))
return h
开发者ID:hanshoi,项目名称:kapsi-git-manager,代码行数:7,代码来源:test_kgm.py
示例4: test_401_with_invalid_credentials
def test_401_with_invalid_credentials(client):
headers = Headers()
headers.set('Authorization', 'Basic ' + base64.b64encode('invalid-username:invalid-password'))
response = client.get('/user', headers=headers)
assert response.status_code == 401
assert response.json.get('error') == 'invalid_token'
开发者ID:chrisopal,项目名称:skylines,代码行数:7,代码来源:access_to_user_test.py
示例5: handler_b2s_image
def handler_b2s_image(self, id=None):
""" Handler for `/b2s_image` url for json data.
It accepts only Communication Kit Notifications.
"""
if id is None:
raise BadRequest()
headers = request.httprequest.headers
self._validate_headers(headers)
correspondence_obj = request.env['correspondence'].sudo()
correspondence = correspondence_obj.search([('uuid', '=', id)])
if not correspondence:
raise NotFound()
data = correspondence.get_image()
headers = Headers()
if correspondence.letter_format == 'zip':
fname = fields.Date.today() + ' letters.zip'
headers.add(
'Content-Disposition', 'attachment',
filename=fname)
response = Response(data, content_type='application/zip',
headers=headers)
else:
headers.add(
'Content-Disposition', 'attachment',
filename=correspondence.file_name)
response = Response(data, content_type='application/pdf',
headers=headers)
return response
开发者ID:maxime-beck,项目名称:compassion-modules,代码行数:30,代码来源:b2s_image.py
示例6: test_401_with_invalid_token
def test_401_with_invalid_token(client):
headers = Headers()
headers.set('Authorization', 'Bearer invalid-token')
response = client.get('/user', headers=headers)
assert response.status_code == 401
assert response.json.get('error') == 'invalid_token'
开发者ID:chrisopal,项目名称:skylines,代码行数:7,代码来源:access_to_user_test.py
示例7: fetch
def fetch():
url = request.form['url']
print url
try:
h = Headers(request.headers)
h.clear()
h.add('referer', 'https://www.facebook.com/')
r = requests.request(
method='GET',
url=url,
headers=h,
timeout=5
)
except (
requests.exceptions.Timeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout):
return Response(status=504)
except (
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.TooManyRedirects):
return Response(status=502)
except (
requests.exceptions.RequestException,
Exception) as e:
if app.debug:
raise e
return Response(status=500)
mimetype = "text/html"
return Response(r.content, mimetype=mimetype)
开发者ID:elaineo,项目名称:dumbproxy,代码行数:33,代码来源:server.py
示例8: test_200_with_credentials
def test_200_with_credentials(client):
headers = Headers()
headers.set("Authorization", basic_auth_encode("[email protected]", "secret123"))
response = client.get("/secrets", headers=headers)
assert response.status_code == 200
assert isinstance(response.json.get("secrets"), list)
开发者ID:skylines-project,项目名称:skylines,代码行数:7,代码来源:access_to_secrets_test.py
示例9: test07_access_token_handler
def test07_access_token_handler(self):
with dummy_app.test_request_context('/a_request'):
auth = IIIFAuthBasic()
response = auth.access_token_handler()
self.assertEqual( response.status_code, 200 )
self.assertEqual( response.headers['Content-type'], 'application/json' )
j = json.loads(response.get_data())
self.assertEqual( j['error_description'], "No login details received" )
self.assertEqual( j['error'], "client_unauthorized" )
# add Authorization header, check we get token
h = Headers()
h.add('Authorization', 'Basic ' + base64.b64encode('userpass:userpass'))
with dummy_app.test_request_context('/a_request', headers=h):
auth = IIIFAuthBasic()
response = auth.access_token_handler()
self.assertEqual( response.status_code, 200 )
self.assertEqual( response.headers['Content-type'], 'application/json' )
j = json.loads(response.get_data())
self.assertEqual( j['access_token'], "secret_token_here" ) #FIXME
self.assertEqual( j['token_type'], "Bearer" )
self.assertEqual( j['expires_in'], 3600 )
# add callback but no Authorization header
with dummy_app.test_request_context('/a_request?callback=CB'):
auth = IIIFAuthBasic()
response = auth.access_token_handler()
self.assertEqual( response.status_code, 200 )
self.assertEqual( response.headers['Content-type'], 'application/javascript' )
# strip JavaScript wrapper and then check JSON
js = response.get_data()
self.assertTrue( re.match('CB\(.*\);',js) )
j = json.loads(js.lstrip('CB(').rstrip(');'))
self.assertEqual( j['error_description'], "No login details received" )
self.assertEqual( j['error'], "client_unauthorized" )
开发者ID:pbinkley,项目名称:iiif,代码行数:33,代码来源:test_auth_basic.py
示例10: test_200_with_authorization_header
def test_200_with_authorization_header(client, access_token):
headers = Headers()
headers.set("Authorization", "Bearer " + access_token)
response = client.get("/secrets", headers=headers)
assert response.status_code == 200
assert isinstance(response.json.get("secrets"), list)
开发者ID:skylines-project,项目名称:skylines,代码行数:7,代码来源:access_to_secrets_test.py
示例11: test_401_with_unsupported_authorization_header
def test_401_with_unsupported_authorization_header(client):
headers = Headers()
headers.set("Authorization", "MAC 123456789")
response = client.get("/secrets", headers=headers)
assert response.status_code == 401
assert response.json.get("error") == "invalid_token"
开发者ID:skylines-project,项目名称:skylines,代码行数:7,代码来源:access_to_secrets_test.py
示例12: excel_response
def excel_response(spreadsheet, filename=u'export.xls'):
"""
Prepares an excel spreadsheet for response in Flask
:param spreadsheet: the spreadsheet
:type spreadsheet:class:`xlwt.Workbook`
:param filename: the name of the file when downloaded
:type filename: unicode
:return: the flask response
:rtype:class:`flask.Response`
"""
response = Response()
response.status_code = 200
output = StringIO.StringIO()
spreadsheet.save(output)
response.data = output.getvalue()
mimetype_tuple = mimetypes.guess_type(filename)
#HTTP headers for forcing file download
response_headers = Headers({
u'Pragma': u"public", # required,
u'Expires': u'0',
u'Cache-Control': [u'must-revalidate, post-check=0, pre-check=0', u'private'],
u'Content-Type': mimetype_tuple[0],
u'Content-Disposition': u'attachment; filename=\"%s\";' % filename,
u'Content-Transfer-Encoding': u'binary',
u'Content-Length': len(response.data)
})
if not mimetype_tuple[1] is None:
response_headers.update({u'Content-Encoding': mimetype_tuple[1]})
response.headers = response_headers
response.set_cookie(u'fileDownload', u'true', path=u'/')
return response
开发者ID:tristaneuan,项目名称:WikiaAuthority,代码行数:34,代码来源:app.py
示例13: test_user_rss_url_etag
def test_user_rss_url_etag(session, client):
entries = EntryFactory.create_batch(5)
author1 = AuthorFactory()
author1.entries.extend(entries)
user = UserFactory(active=True)
user.userfeed.private = False
sub1 = Subscription(user=user, author=author1)
sub1.save()
h = Headers()
h.add("If-None-Match", None)
response1 = client.get(url_for("users.user_feed", user_id=user.id), headers=h)
assert response1.status_code == 200
etag = response1.headers.get("ETag")
assert response1.data
assert (
response1.headers.get("Content-Type") == "application/atom+xml; charset=utf-8"
)
h.add("If-None-Match", etag)
response2 = client.get(url_for("users.user_feed", user_id=user.id), headers=h)
assert response2.status_code == 304
etag2 = response2.headers.get("ETag")
assert etag2 == etag
assert not response2.data
开发者ID:DBeath,项目名称:flask-feedrsub,代码行数:27,代码来源:user_test.py
示例14: test_admin_page_rejects_bad_password
def test_admin_page_rejects_bad_password(self):
""" Check that incorrect password won't allow access """
h = Headers()
auth = '{0}:foo'.format(Config.USERNAME).encode('ascii')
h.add('Authorization', b'Basic ' + base64.b64encode(auth))
rv = Client.open(self.client, path='/', headers=h)
self.assert_401(rv)
开发者ID:jmcarp,项目名称:cg-quotas-db,代码行数:7,代码来源:tests.py
示例15: test_admin_page_rejects_bad_username
def test_admin_page_rejects_bad_username(self):
""" Check that incorrect username won't allow access """
h = Headers()
auth = 'foo:{0}'.format(Config.PASSWORD).encode('ascii')
h.add('Authorization', b'Basic ' + base64.b64encode(auth))
rv = Client.open(self.client, path='/', headers=h)
self.assert_401(rv)
开发者ID:jmcarp,项目名称:cg-quotas-db,代码行数:7,代码来源:tests.py
示例16: test_200_with_authorization_header
def test_200_with_authorization_header(client, access_token, test_user):
headers = Headers()
headers.set('Authorization', 'Bearer ' + access_token)
response = client.get('/user', headers=headers)
assert response.status_code == 200
assert response.json.get('user') == test_user.id
开发者ID:chrisopal,项目名称:skylines,代码行数:7,代码来源:access_to_user_test.py
示例17: item_ogg
def item_ogg(item_id, ogg_q):
from subprocess import Popen, PIPE
import mimetypes;
item = g.lib.get_item(item_id)
filename = os.path.split(item.path)[1]
filename = os.path.splitext(filename)[0] + '.ogg'
headers = Headers()
headers.add('Content-Type', 'audio/ogg')
headers.add('Content-Disposition', 'attachment', filename=filename)
if mimetypes.guess_type(item.path)[0] == 'audio/mpeg':
decoded_fp = Popen(
["mpg123", "-q", "-w", "/dev/stdout", item.path],
stdout=PIPE)
ogg_fp = Popen(
["oggenc", "-q", str(ogg_q), "-Q", "-"],
stdin=decoded_fp.stdout,
stdout=PIPE);
decoded_fp.stdout.close()
else:
ogg_fp = Popen(
["oggenc", "-q", str(ogg_q),"-Q", "-o", "/dev/stdout", item.path],
stdout=PIPE);
res = Response(
#wrap_file(request.environ, ogg_fp.stdout),
ogg_fp.stdout,
headers=headers,
direct_passthrough=True)
res.implicit_sequence_conversion = False
return res
开发者ID:djrtl,项目名称:beets-dj,代码行数:33,代码来源:__init__.py
示例18: test_200_with_credentials
def test_200_with_credentials(client, test_user):
headers = Headers()
headers.set('Authorization', 'Basic ' + base64.b64encode('[email protected]:secret123'))
response = client.get('/user', headers=headers)
assert response.status_code == 200
assert response.json.get('user') == test_user.id
开发者ID:chrisopal,项目名称:skylines,代码行数:7,代码来源:access_to_user_test.py
示例19: test_crossdomain
def test_crossdomain(self):
class Foo(flask_restful.Resource):
@cors.crossdomain(allow_origin='*')
def get(self):
return "data"
app = Flask(__name__)
api = flask_restful.Api(app)
api.add_resource(Foo, '/')
with app.test_client() as client:
h = Headers()
origin = "http://foo.bar"
h.add('Origin', origin)
res = client.get('/', headers=h)
headers = res.headers
assert_equals(res.status_code, 200)
assert_true(headers['Access-Control-Allow-Origin'] in ('*',
origin))
assert_equals(headers['Access-Control-Max-Age'], '21600')
allow_methods = headers['Access-Control-Allow-Methods']
assert_true('HEAD' in allow_methods)
assert_true('OPTIONS' in allow_methods)
assert_true('GET' in allow_methods)
开发者ID:versae,项目名称:flask-restful,代码行数:25,代码来源:test_cors.py
示例20: __init__
def __init__(self, response = None, status = None, headers = None, mimetype = None, content_type = None, direct_passthrough = False):
if isinstance(headers, Headers):
self.headers = headers
elif not headers:
self.headers = Headers()
else:
self.headers = Headers(headers)
if content_type is None:
if mimetype is None and 'content-type' not in self.headers:
mimetype = self.default_mimetype
if mimetype is not None:
mimetype = get_content_type(mimetype, self.charset)
content_type = mimetype
if content_type is not None:
self.headers['Content-Type'] = content_type
if status is None:
status = self.default_status
if isinstance(status, (int, long)):
self.status_code = status
else:
self.status = status
self.direct_passthrough = direct_passthrough
self._on_close = []
if response is None:
self.response = []
elif isinstance(response, basestring):
self.data = response
else:
self.response = response
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:29,代码来源:wrappers.py
注:本文中的werkzeug.datastructures.Headers类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论