本文整理汇总了Python中tornado.httputil.HTTPHeaders类的典型用法代码示例。如果您正苦于以下问题:Python HTTPHeaders类的具体用法?Python HTTPHeaders怎么用?Python HTTPHeaders使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HTTPHeaders类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _delete
def _delete(self, url, headers=None, callback=None):
h = HTTPHeaders()
h.update(self._default_headers)
if headers:
h.update(headers)
req = HTTPRequest(url, headers=headers, method="DELETE")
self._client.fetch(req, callback)
开发者ID:sojoner,项目名称:dopplr,代码行数:7,代码来源:baseclient.py
示例2: _prepare_request
def _prepare_request(self, messages):
# Determine the URL for the messages
url = self.url
if self._append_message_type and len(messages) == 1 and messages[0].channel.is_meta():
message_type = '/'.join(messages[0].channel.parts()[1:])
if not url.endswith('/'):
url += '/'
url += message_type
# Get the headers for the request
headers = HTTPHeaders()
for header, values in self.get_headers().iteritems():
for value in values:
headers.add(header, value)
for header, value in headers.get_all():
self.log.debug('Request header %s: %s' % (header, value))
# Get the body for the request
body = Message.to_json(messages, encoding='utf8')
self.log.debug('Request body (length: %d): %s' % (len(body), body))
# Get the timeout (in seconds)
timeout = self.get_timeout(messages) / 1000.0
self.log.debug('Request timeout: %ss' % timeout)
# Build and return the request
return HTTPRequest(
url,
method='POST',
headers=headers,
body=body,
connect_timeout=timeout,
request_timeout=timeout
)
开发者ID:silentsound,项目名称:baiocas,代码行数:35,代码来源:long_polling.py
示例3: post
def post(self,param):
targetURL = self.get_argument('url')
if DEBUG: print "target URL: " + targetURL
try:
serverURL= self.request.protocol + '://' + self.request.host
http_client = AsyncHTTPClient()
sub = yield http_client.fetch(targetURL, validate_cert=False)
sub_filename = targetURL[targetURL.rfind('/'):]
sub_filename = "fornow" #TODO - the URL doesn;t have to end with a filename, is it worth keeping?
files = []
files.append((sub_filename, sub_filename, sub.body))
fields = []
fields.append(("_xsrf" , self.xsrf_token))
content_type, body = encode_multipart_formdata(fields, files)
headers = HTTPHeaders({"Content-Type": content_type, 'content-length': str(len(body))})
headers.add("Cookie", "_xsrf=" + self.xsrf_token)
request = HTTPRequest(serverURL + "/import/", "POST", headers=headers, body=body, validate_cert=False)
response = yield http_client.fetch(request)
self.write(response.body)
except Exception, e:
print 'Failed to upload from URL (DocumentWrapperHandler)', e
self.write("Failed to upload from '" + targetURL + "'")
self.finish()
self.flush()
开发者ID:harlo,项目名称:InformaFrontend,代码行数:29,代码来源:v2j3m.py
示例4: request_to_curl_string
def request_to_curl_string(request):
def _escape_apos(s):
return s.replace("'", "'\"'\"'")
try:
if request.body:
request.body.decode('ascii')
is_binary_data = False
except UnicodeError:
is_binary_data = True
curl_headers = HTTPHeaders(request.headers)
if request.body and 'Content-Length' not in curl_headers:
curl_headers['Content-Length'] = len(request.body)
if is_binary_data:
curl_echo_data = "echo -e {} |".format(repr(request.body))
curl_data_string = '--data-binary @-'
else:
curl_echo_data = ''
curl_data_string = "--data '{}'".format(_escape_apos(str(request.body))) if request.body else ''
return "{echo} curl -X {method} '{url}' {headers} {data}".format(
echo=curl_echo_data,
method=request.method,
url=request.url,
headers=' '.join("-H '{}: {}'".format(k, _escape_apos(str(v))) for k, v in curl_headers.items()),
data=curl_data_string
).strip()
开发者ID:glibin,项目名称:tortik,代码行数:29,代码来源:dumper.py
示例5: compose_response
def compose_response(self):
headers = HTTPHeaders()
headers = self.process_headers(headers)
lines = []
lines.append("HTTP/1.1 %d %s" % (
self.response.code,
responses[self.response.code]
))
for k, v in headers.get_all():
lines.append(k + ": " + v)
head = "\r\n".join(lines) + "\r\n\r\n"
head = head.encode("ascii")
body = self.process_body(self.response.body)
if body is not None:
return head + self.response.body
else:
return head
开发者ID:ei-grad,项目名称:charon,代码行数:25,代码来源:charon.py
示例6: execute
def execute(self):
url = self._make_url('/images/{0}/push'.format(self.name))
registry, name = resolve_repository_name(self.name)
headers = HTTPHeaders()
headers.add(REGISTRY_AUTH_HEADER, self._prepare_auth_header_value())
body = ''
log.info('Pushing "%s" into "%s"... ', name, registry)
log.debug('Pushing url: %s', url)
request = HTTPRequest(url, method='POST',
headers=headers,
body=body,
allow_ipv6=True,
request_timeout=self.timeout,
streaming_callback=self._on_body)
try:
result = yield self._http_client.fetch(request)
if self._lasterr is not None:
raise self._lasterr
log.info('OK')
except Exception as err:
log.error('FAIL - %s', err)
raise err
raise gen.Return(result)
开发者ID:Alukardd,项目名称:cocaine-tools,代码行数:25,代码来源:docker.py
示例7: _clean_headers
def _clean_headers(self):
"""
清理headers中不需要的部分,以及替换值
:return:
"""
headers = self.request.headers
# 更新host字段为后端访问网站的host
headers['Host'] = self.client.request.endpoint['netloc']
new_headers = HTTPHeaders()
# 如果 header 有的是 str,有的是 unicode
# 会出现 422 错误
for name, value in headers.get_all():
# 过滤 x-api 开头的, 这些只是发给 api-gateway
l_name = name.lower()
# 这些 headers 需要传递给后端
required_headers = ['x-api-user-json', 'x-api-access-key']
if l_name.startswith('x-api-') and l_name not in required_headers:
pass
# 不需要提供 Content-Length, 自动计算
# 如果 Content-Length 不正确, 请求后端网站会出错,
# 太大会出现超时问题, 太小会出现内容被截断
elif l_name == 'content-length':
pass
else:
new_headers.add(text_type(name), text_type(value))
return new_headers
开发者ID:baboq,项目名称:api-gateway,代码行数:27,代码来源:proxy.py
示例8: weibo_request
def weibo_request(self, path, callback, access_token=None, expires_in=None,
post_args=None, **args):
url = "https://api.weibo.com/2/" + path + ".json"
all_args = {}
if access_token:
all_args['access_token'] = access_token
all_args.update(args)
all_args.update(post_args or {})
header = HTTPHeaders({'Authorization': 'OAuth2 %s' % access_token})
callback = self.async_callback(self._on_weibo_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
has_file = False
for key,value in post_args.iteritems():
if hasattr(value,"read"):
has_file = True
if has_file:
post_args,boundary = encode_multipart(post_args)
header.add('Content-Type', 'multipart/form-data; boundary=%s' %boundary)
header.add('Content-Length', len(post_args))
http.fetch(url, method="POST", body=post_args,
callback=callback,headers=header)
else:
http.fetch(url, method="POST", body=urllib.urlencode(all_args),
callback=callback,headers=header)
else:
if all_args: url += "?" + urllib.urlencode(all_args)
http.fetch(url, callback=callback,headers=header)
开发者ID:qicfan,项目名称:petmerry,代码行数:28,代码来源:WeiboAuth.py
示例9: request_to_curl_string
def request_to_curl_string(request):
def _escape_apos(string):
return string.replace("'", "'\"'\"'")
try:
request_body = _escape_apos(request.body.decode('ascii')) if request.body else None
is_binary_body = False
except UnicodeError:
request_body = repr(request.body).strip('b')
is_binary_body = True
curl_headers = HTTPHeaders(request.headers)
if request.body and 'Content-Length' not in curl_headers:
curl_headers['Content-Length'] = len(request.body)
if is_binary_body:
curl_echo_data = f'echo -e {request_body} |'
curl_data_string = '--data-binary @-'
else:
curl_echo_data = ''
curl_data_string = f"--data '{request_body}'" if request_body else ''
def _format_header(key):
header_value = frontik.util.any_to_unicode(curl_headers[key])
return f"-H '{key}: {_escape_apos(header_value)}'"
return "{echo} curl -X {method} '{url}' {headers} {data}".format(
echo=curl_echo_data,
method=request.method,
url=to_unicode(request.url),
headers=' '.join(_format_header(k) for k in sorted(curl_headers.keys())),
data=curl_data_string
).strip()
开发者ID:hhru,项目名称:frontik,代码行数:33,代码来源:debug.py
示例10: send_object
def send_object(cls, object_url):
"""
Sends an OpenSlides object to all connected clients (waiters).
First, retrieve the object from the OpenSlides REST api using the given
object_url.
"""
# Join network location with object URL.
# TODO: Use host and port as given in the start script
wsgi_network_location = settings.OPENSLIDES_WSGI_NETWORK_LOCATION or 'http://localhost:8000'
url = ''.join((wsgi_network_location, object_url))
# Send out internal HTTP request to get data from the REST api.
for waiter in cls.waiters:
# Read waiter's former cookies and parse session cookie to new header object.
headers = HTTPHeaders()
try:
session_cookie = waiter.connection_info.cookies[settings.SESSION_COOKIE_NAME]
except KeyError:
# There is no session cookie
pass
else:
headers.add('Cookie', '%s=%s' % (settings.SESSION_COOKIE_NAME, session_cookie.value))
# Setup uncompressed request.
request = HTTPRequest(
url=url,
headers=headers,
decompress_response=False)
# Setup non-blocking HTTP client
http_client = AsyncHTTPClient()
# Executes the request, asynchronously returning an HTTPResponse
# and calling waiter's forward_rest_response() method.
http_client.fetch(request, waiter.forward_rest_response)
开发者ID:munasharma,项目名称:OpenSlides,代码行数:33,代码来源:autoupdate.py
示例11: headers_parse_simple
def headers_parse_simple(headers: str) -> HTTPHeaders:
h = HTTPHeaders()
for line in headers.split("\n"):
if line.endswith("\r"):
line = line[:-1]
if line:
h.parse_line(line)
return h
开发者ID:bdarnell,项目名称:tornado,代码行数:8,代码来源:parsing_benchmark.py
示例12: parse_headers
def parse_headers(data):
headers = HTTPHeaders()
for line in data.splitlines():
if line:
try:
headers.parse_line(line)
except Exception, e:
break
开发者ID:alexmerser,项目名称:python-element,代码行数:9,代码来源:proxy.py
示例13: test_setdefault
def test_setdefault(self):
headers = HTTPHeaders()
headers['foo'] = 'bar'
# If a value is present, setdefault returns it without changes.
self.assertEqual(headers.setdefault('foo', 'baz'), 'bar')
self.assertEqual(headers['foo'], 'bar')
# If a value is not present, setdefault sets it for future use.
self.assertEqual(headers.setdefault('quux', 'xyzzy'), 'xyzzy')
self.assertEqual(headers['quux'], 'xyzzy')
self.assertEqual(sorted(headers.get_all()), [('Foo', 'bar'), ('Quux', 'xyzzy')])
开发者ID:FlorianLudwig,项目名称:tornado,代码行数:10,代码来源:httputil_test.py
示例14: _parse_headers
def _parse_headers(self):
frame = self._header_frames[0]
data = b''.join(f.data for f in self._header_frames)
self._header_frames = []
if frame.flags & constants.FrameFlag.PRIORITY:
# TODO: support PRIORITY and PADDING.
# This is just enough to cover an error case tested in h2spec.
stream_dep, weight = struct.unpack('>ib', data[:5])
data = data[5:]
# strip off the "exclusive" bit
stream_dep = stream_dep & 0x7fffffff
if stream_dep == frame.stream_id:
raise ConnectionError(constants.ErrorCode.PROTOCOL_ERROR,
"stream cannot depend on itself")
pseudo_headers = {}
headers = HTTPHeaders()
try:
# Pseudo-headers must come before any regular headers,
# and only in the first HEADERS phase.
has_regular_header = bool(self._phase == constants.HTTPPhase.TRAILERS)
for k, v, idx in self.conn.hpack_decoder.decode(bytearray(data)):
if k != k.lower():
# RFC section 8.1.2
raise StreamError(self.stream_id,
constants.ErrorCode.PROTOCOL_ERROR)
if k.startswith(b':'):
if self.conn.is_client:
valid_pseudo_headers = (b':status',)
else:
valid_pseudo_headers = (b':method', b':scheme',
b':authority', b':path')
if (has_regular_header or
k not in valid_pseudo_headers or
native_str(k) in pseudo_headers):
raise StreamError(self.stream_id,
constants.ErrorCode.PROTOCOL_ERROR)
pseudo_headers[native_str(k)] = native_str(v)
if k == b":authority":
headers.add("Host", native_str(v))
else:
headers.add(native_str(k), native_str(v))
has_regular_header = True
except HpackError:
raise ConnectionError(constants.ErrorCode.COMPRESSION_ERROR)
if self._phase == constants.HTTPPhase.HEADERS:
self._start_request(pseudo_headers, headers)
elif self._phase == constants.HTTPPhase.TRAILERS:
# TODO: support trailers
pass
if (not self._maybe_end_stream(frame.flags) and
self._phase == constants.HTTPPhase.TRAILERS):
# The frame that finishes the trailers must also finish
# the stream.
raise StreamError(self.stream_id, constants.ErrorCode.PROTOCOL_ERROR)
开发者ID:bdarnell,项目名称:tornado_http2,代码行数:54,代码来源:stream.py
示例15: test_urllib2
def test_urllib2(scheme, root_span, install_hooks):
request = urllib2.Request('%s://localhost:9777/proxy' % scheme,
headers={'Remote-LOC': 'New New York',
'Remote-Op': 'antiquing'})
class Response(object):
def __init__(self):
self.code = 200
self.msg = ''
def info(self):
return None
if root_span:
root_span = mock.MagicMock()
root_span.context = mock.MagicMock()
root_span.finish = mock.MagicMock()
root_span.__exit__ = mock.MagicMock()
else:
root_span = None
span = mock.MagicMock()
span.set_tag = mock.MagicMock()
span.finish = mock.MagicMock()
def inject(span_context, format, carrier):
carrier['TRACE-ID'] = '123'
p_do_open = mock.patch('urllib2.AbstractHTTPHandler.do_open',
return_value=Response())
p_start_span = mock.patch.object(opentracing.tracer, 'start_span',
return_value=span)
p_inject = mock.patch.object(opentracing.tracer, 'inject',
side_effect=inject)
p_current_span = span_in_context(span=root_span)
with p_do_open, p_start_span as start_call, p_inject, p_current_span:
resp = urllib2.urlopen(request)
expected_references = root_span.context if root_span else None
start_call.assert_called_once_with(
operation_name='GET:antiquing',
child_of=expected_references,
tags=None,
)
assert resp is not None
span.set_tag.assert_any_call('span.kind', 'client')
assert span.__enter__.call_count == 1
assert span.__exit__.call_count == 1, 'ensure finish() was called'
if root_span:
assert root_span.__exit__.call_count == 0, 'do not finish root span'
# verify trace-id was correctly injected into headers
norm_headers = HTTPHeaders(request.headers)
assert norm_headers.get('trace-id') == '123'
开发者ID:uber-common,项目名称:opentracing-python-instrumentation,代码行数:54,代码来源:test_sync_client_hooks.py
示例16: _get
def _get(self, url, headers=None, callback=None):
"""
A `GET` request to the solr.
"""
h = HTTPHeaders()
h.update(self._default_headers)
if headers:
h.update(headers)
req = HTTPRequest(url, headers=headers)
self._client.fetch(req, callback)
开发者ID:purplecow,项目名称:doppler,代码行数:11,代码来源:client.py
示例17: headers_received
def headers_received(
self,
start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
headers: httputil.HTTPHeaders,
) -> Optional[Awaitable[None]]:
if headers.get("Content-Encoding") == "gzip":
self._decompressor = GzipDecompressor()
# Downstream delegates will only see uncompressed data,
# so rename the content-encoding header.
# (but note that curl_httpclient doesn't do this).
headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"])
del headers["Content-Encoding"]
return self._delegate.headers_received(start_line, headers)
开发者ID:rgbkrk,项目名称:tornado,代码行数:13,代码来源:http1connection.py
示例18: _HTTPRequest
class _HTTPRequest(object):
def __init__(self, request, data):
self._underlying_request = request
method, url, version, headers, self._body = msgpack_unpackb(data)
if six.PY3:
method = method.decode()
url = url.decode()
version = version.decode()
headers = [(k.decode(), v.decode()) for k, v in headers]
self._headers = HTTPHeaders(headers)
self._meta = {
'method': method,
'version': version,
'host': self._headers.get('Host', ''),
'remote_addr': self._headers.get('X-Real-IP') or self._headers.get('X-Forwarded-For', ''),
'query_string': urlparse.urlparse(url).query,
'cookies': dict(),
'parsed_cookies': http_parse_cookies(self._headers),
}
args = urlparse.parse_qs(urlparse.urlparse(url).query)
self._files = dict()
parse_body_arguments(self._headers.get("Content-Type", ""), self._body, args, self._files)
self._request = dict_list_to_single(args)
@property
def headers(self):
return self._headers
def hpack_headers(self):
return self._underlying_request.headers
@property
def body(self):
"""Return request body"""
return self._body
@property
def meta(self):
return self._meta
@property
def request(self):
return self._request
@property
def files(self):
return self._files
开发者ID:cocaine,项目名称:cocaine-framework-python,代码行数:48,代码来源:http_dec.py
示例19: _on_headers
def _on_headers(self, data):
data = native_str(data.decode("latin1"))
first_line, _, header_data = data.partition("\n")
match = re.match("HTTP/1.[01] ([0-9]+) ([^\r]*)", first_line)
assert match
code = int(match.group(1))
self.headers = HTTPHeaders.parse(header_data)
if 100 <= code < 200:
self._handle_1xx(code)
return
else:
self.code = code
self.reason = match.group(2)
if "Content-Length" in self.headers:
if "," in self.headers["Content-Length"]:
# Proxies sometimes cause Content-Length headers to get
# duplicated. If all the values are identical then we can
# use them but if they differ it's an error.
pieces = re.split(r',\s*', self.headers["Content-Length"])
if any(i != pieces[0] for i in pieces):
raise ValueError("Multiple unequal Content-Lengths: %r" %
self.headers["Content-Length"])
self.headers["Content-Length"] = pieces[0]
content_length = int(self.headers["Content-Length"])
else:
content_length = None
if self.request.header_callback is not None:
# re-attach the newline we split on earlier
self.request.header_callback(first_line + _)
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
self.request.header_callback('\r\n')
if self.request.method == "HEAD" or self.code == 304:
# HEAD requests and 304 responses never have content, even
# though they may have content-length headers
self._on_body(b"")
return
if 100 <= self.code < 200 or self.code == 204:
# These response codes never have bodies
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
if ("Transfer-Encoding" in self.headers or
content_length not in (None, 0)):
raise ValueError("Response with code %d should not have body" %
self.code)
self._on_body(b"")
return
if (self.request.use_gzip and
self.headers.get("Content-Encoding") == "gzip"):
self._decompressor = GzipDecompressor()
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b"\r\n", self._on_chunk_length)
elif content_length is not None:
self.stream.read_bytes(content_length, self._on_body)
else:
self.stream.read_until_close(self._on_body)
开发者ID:zhkzyth,项目名称:tornado-reading-notes,代码行数:60,代码来源:simple_httpclient.py
示例20: test_100_continue
def test_100_continue(self):
# Run through a 100-continue interaction by hand:
# When given Expect: 100-continue, we get a 100 response after the
# headers, and then the real response after the body.
stream = IOStream(socket.socket())
stream.connect(("127.0.0.1", self.get_http_port()), callback=self.stop)
self.wait()
stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
b"Content-Length: 1024",
b"Expect: 100-continue",
b"Connection: close",
b"\r\n"]), callback=self.stop)
self.wait()
stream.read_until(b"\r\n\r\n", self.stop)
data = self.wait()
self.assertTrue(data.startswith(b"HTTP/1.1 100 "), data)
stream.write(b"a" * 1024)
stream.read_until(b"\r\n", self.stop)
first_line = self.wait()
self.assertTrue(first_line.startswith(b"HTTP/1.1 200"), first_line)
stream.read_until(b"\r\n\r\n", self.stop)
header_data = self.wait()
headers = HTTPHeaders.parse(native_str(header_data.decode('latin1')))
stream.read_bytes(int(headers["Content-Length"]), self.stop)
body = self.wait()
self.assertEqual(body, b"Got 1024 bytes in POST")
stream.close()
开发者ID:alexdxy,项目名称:tornado,代码行数:27,代码来源:httpserver_test.py
注:本文中的tornado.httputil.HTTPHeaders类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论