• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python simple_httpclient.SimpleAsyncHTTPClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.simple_httpclient.SimpleAsyncHTTPClient的典型用法代码示例。如果您正苦于以下问题:Python SimpleAsyncHTTPClient类的具体用法?Python SimpleAsyncHTTPClient怎么用?Python SimpleAsyncHTTPClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SimpleAsyncHTTPClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: HostnameMappingTestCase

class HostnameMappingTestCase(AsyncHTTPTestCase):
    def setUp(self):
        super(HostnameMappingTestCase, self).setUp()
        self.http_client = SimpleAsyncHTTPClient(
            self.io_loop,
            hostname_mapping={
                'www.example.com': '127.0.0.1',
                ('foo.example.com', 8000): ('127.0.0.1', self.get_http_port()),
            })

    def get_app(self):
        return Application([url("/hello", HelloWorldHandler), ])

    def test_hostname_mapping(self):
        self.http_client.fetch(
            'http://www.example.com:%d/hello' % self.get_http_port(), self.stop)
        response = self.wait()
        response.rethrow()
        self.assertEqual(response.body, b'Hello world!')

    def test_port_mapping(self):
        self.http_client.fetch('http://foo.example.com:8000/hello', self.stop)
        response = self.wait()
        response.rethrow()
        self.assertEqual(response.body, b'Hello world!')
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:25,代码来源:simple_httpclient_test.py


示例2: BaseSSLTest

class BaseSSLTest(AsyncHTTPTestCase, LogTrapTestCase):
    def get_ssl_version(self):
        raise NotImplementedError()

    def setUp(self):
        super(BaseSSLTest, self).setUp()
        # Replace the client defined in the parent class.
        # Some versions of libcurl have deadlock bugs with ssl,
        # so always run these tests with SimpleAsyncHTTPClient.
        self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
                                                 force_instance=True)

    def get_app(self):
        return Application([('/', HelloWorldRequestHandler,
                             dict(protocol="https"))])

    def get_httpserver_options(self):
        # Testing keys were generated with:
        # openssl req -new -keyout tornado/test/test.key -out tornado/test/test.crt -nodes -days 3650 -x509
        test_dir = os.path.dirname(__file__)
        return dict(ssl_options=dict(
                certfile=os.path.join(test_dir, 'test.crt'),
                keyfile=os.path.join(test_dir, 'test.key'),
                ssl_version=self.get_ssl_version()))

    def fetch(self, path, **kwargs):
        self.http_client.fetch(self.get_url(path).replace('http', 'https'),
                               self.stop,
                               validate_cert=False,
                               **kwargs)
        return self.wait()
开发者ID:AM7000000,项目名称:TornadoOSCWSStream,代码行数:31,代码来源:httpserver_test.py


示例3: test_connection_limit

    def test_connection_limit(self):
        client = SimpleAsyncHTTPClient(self.io_loop, max_clients=2,
                                       force_instance=True)
        self.assertEqual(client.max_clients, 2)
        seen = []
        # Send 4 requests.  Two can be sent immediately, while the others
        # will be queued
        for i in range(4):
            client.fetch(self.get_url("/trigger"),
                         lambda response, i=i: (seen.append(i), self.stop()))
        self.wait(condition=lambda: len(self.triggers) == 2)
        self.assertEqual(len(client.queue), 2)

        # Finish the first two requests and let the next two through
        self.triggers.popleft()()
        self.triggers.popleft()()
        self.wait(condition=lambda: (len(self.triggers) == 2 and
                                     len(seen) == 2))
        self.assertEqual(set(seen), set([0, 1]))
        self.assertEqual(len(client.queue), 0)

        # Finish all the pending requests
        self.triggers.popleft()()
        self.triggers.popleft()()
        self.wait(condition=lambda: len(seen) == 4)
        self.assertEqual(set(seen), set([0, 1, 2, 3]))
        self.assertEqual(len(self.triggers), 0)
开发者ID:CoolCold,项目名称:tornado,代码行数:27,代码来源:simple_httpclient_test.py


示例4: fetch

 def fetch(self, request, callback, **kwargs):
     self.request_queue.append((request, callback, kwargs))
     while self.available_requests > 0 and len(self.request_queue) > 0:
         request, callback, kwargs = self.request_queue.popleft()
         SimpleAsyncHTTPClient.fetch(self, request, callback, **kwargs)
         self.available_requests -= 1
         yield gen.Task(self.io_loop.add_timeout, self.period)
         self.available_requests += 1
开发者ID:alekstorm,项目名称:etsy-tornado,代码行数:8,代码来源:throttled_httpclient.py


示例5: test_redirect_connection_limit

 def test_redirect_connection_limit(self):
     # following redirects should not consume additional connections
     client = SimpleAsyncHTTPClient(self.io_loop, max_clients=1,
                                    force_instance=True)
     client.fetch(self.get_url('/countdown/3'), self.stop,
                  max_redirects=3)
     response = self.wait()
     response.rethrow()
开发者ID:CoolCold,项目名称:tornado,代码行数:8,代码来源:simple_httpclient_test.py


示例6: raw_fetch

 def raw_fetch(self, headers, body):
     client = SimpleAsyncHTTPClient(self.io_loop)
     conn = RawRequestHTTPConnection(
         self.io_loop, client, httpclient.HTTPRequest(self.get_url("/")), None, self.stop, 1024 * 1024
     )
     conn.set_request(b("\r\n").join(headers + [utf8("Content-Length: %d\r\n" % len(body))]) + b("\r\n") + body)
     response = self.wait()
     client.close()
     response.rethrow()
     return response
开发者ID:trawor,项目名称:tornado,代码行数:10,代码来源:httpserver_test.py


示例7: setUp

 def setUp(self):
     super(SSLTest, self).setUp()
     # Replace the client defined in the parent class.
     # Some versions of libcurl have deadlock bugs with ssl,
     # so always run these tests with SimpleAsyncHTTPClient.
     self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
                                              force_instance=True)
开发者ID:nixon,项目名称:tornado,代码行数:7,代码来源:httpserver_test.py


示例8: setUp

 def setUp(self):
     super(HostnameMappingTestCase, self).setUp()
     self.http_client = SimpleAsyncHTTPClient(
         hostname_mapping={
             'www.example.com': '127.0.0.1',
             ('foo.example.com', 8000): ('127.0.0.1', self.get_http_port()),
         })
开发者ID:alexdxy,项目名称:tornado,代码行数:7,代码来源:simple_httpclient_test.py


示例9: SSLTest

class SSLTest(AsyncHTTPTestCase, LogTrapTestCase):
    def setUp(self):
        super(SSLTest, self).setUp()
        # Replace the client defined in the parent class.
        # Some versions of libcurl have deadlock bugs with ssl,
        # so always run these tests with SimpleAsyncHTTPClient.
        self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
                                                 force_instance=True)

    def get_app(self):
        return Application([('/', HelloWorldRequestHandler, 
                             dict(protocol="https"))])

    def get_httpserver_options(self):
        # Testing keys were generated with:
        # openssl req -new -keyout tornado/test/test.key -out tornado/test/test.crt -nodes -days 3650 -x509
        test_dir = os.path.dirname(__file__)
        return dict(ssl_options=dict(
                certfile=os.path.join(test_dir, 'test.crt'),
                keyfile=os.path.join(test_dir, 'test.key')))

    def fetch(self, path, **kwargs):
        self.http_client.fetch(self.get_url(path).replace('http', 'https'),
                               self.stop,
                               validate_cert=False,
                               **kwargs)
        return self.wait()

    def test_ssl(self):
        response = self.fetch('/')
        self.assertEqual(response.body, b("Hello world"))

    def test_large_post(self):
        response = self.fetch('/',
                              method='POST',
                              body='A'*5000)
        self.assertEqual(response.body, b("Got 5000 bytes in POST"))

    def test_non_ssl_request(self):
        # Make sure the server closes the connection when it gets a non-ssl
        # connection, rather than waiting for a timeout or otherwise
        # misbehaving.
        self.http_client.fetch(self.get_url("/"), self.stop,
                               request_timeout=3600,
                               connect_timeout=3600)
        response = self.wait()
        self.assertEqual(response.code, 599)
开发者ID:dhruvg,项目名称:tornado,代码行数:47,代码来源:httpserver_test.py


示例10: raw_fetch

 def raw_fetch(self, headers, body):
     client = SimpleAsyncHTTPClient(self.io_loop)
     conn = RawRequestHTTPConnection(
         self.io_loop, client,
         httpclient._RequestProxy(
             httpclient.HTTPRequest(self.get_url("/")),
             dict(httpclient.HTTPRequest._DEFAULTS)),
         None, self.stop,
         1024 * 1024, Resolver(io_loop=self.io_loop))
     conn.set_request(
         b"\r\n".join(headers +
                      [utf8("Content-Length: %d\r\n" % len(body))]) +
         b"\r\n" + body)
     response = self.wait()
     client.close()
     response.rethrow()
     return response
开发者ID:good1111,项目名称:pj-redis,代码行数:17,代码来源:httpserver_test.py


示例11: __init__

 def __init__(self, link: str, loop: IOLoop = None, timeout: int = 50):
     self.link = link
     self.client = SimpleAsyncHTTPClient(loop)
     self.timeout = timeout
     if loop is None:
         self.loop = IOLoop.current(False)
     else:
         self.loop = loop
     self.header_only = False
开发者ID:paulnaoki,项目名称:DomainFinderSrcUniversal,代码行数:9,代码来源:NonBlockingRequest.py


示例12: setUp

 def setUp(self):
     super(HostnameMappingTestCase, self).setUp()
     self.http_client = SimpleAsyncHTTPClient(
         self.io_loop,
         hostname_mapping={
             "www.example.com": "127.0.0.1",
             ("foo.example.com", 8000): ("127.0.0.1", self.get_http_port()),
         },
     )
开发者ID:RyanWarm,项目名称:ZUtils,代码行数:9,代码来源:simple_httpclient_test.py


示例13: estimate

    def estimate(self, source, destination, force_refresh=False):
        self._count += 1

        self.debug('Estimating (Total {0} requested so far)'.format(
            self._count))

        estimate = self._get_cache(source, destination)
        if estimate:
            raise gen.Return(estimate)

        data = {
            'startX': source[0],
            'startY': source[1],
            'endX': destination[0],
            'endY': destination[1],
            'reqCoordType': 'WGS84GEO',
            'resCoordType': 'WGS84GEO',
            'speed': int(self.speed),
        }
        headers = {
            'accept': 'application/json',
            'appkey': settings.TMAP_API_KEY,
        }
        body = urllib.urlencode(data)
        client = SimpleAsyncHTTPClient()
        request = HTTPRequest(self.url, method='POST', headers=headers,
                              body=body)

        r = yield client.fetch(request)

        data = json.loads(r.body)
        error = data.get('error')
        if error:
            raise TmapException(error)

        prop = data['features'][0]['properties']
        estimate = Estimate(int(prop['totalDistance']), int(prop['totalTime']))

        self._set_cache(source, destination, estimate)

        raise gen.Return(estimate)
开发者ID:bossjones,项目名称:cabbie-backend,代码行数:41,代码来源:estimate.py


示例14: main

def main():
    parse_command_line()
    app = Application([('/', ChunkHandler)])
    app.listen(options.port, address='127.0.0.1')
    def callback(response):
        response.rethrow()
        assert len(response.body) == (options.num_chunks * options.chunk_size)
        logging.warning("fetch completed in %s seconds", response.request_time)
        IOLoop.instance().stop()

    logging.warning("Starting fetch with curl client")
    curl_client = CurlAsyncHTTPClient()
    curl_client.fetch('http://localhost:%d/' % options.port,
                      callback=callback)
    IOLoop.instance().start()

    logging.warning("Starting fetch with simple client")
    simple_client = SimpleAsyncHTTPClient()
    simple_client.fetch('http://localhost:%d/' % options.port,
                        callback=callback)
    IOLoop.instance().start()
开发者ID:08opt,项目名称:tornado,代码行数:21,代码来源:chunk_benchmark.py


示例15: test_ip

def test_ip(ip):
    hostname = APP_ID + '.appspot.com'
    url = 'https://' + hostname + '/2'
    request = HTTPRequest(url, request_timeout=5)

    try:
        client = SimpleAsyncHTTPClient(force_instance=True,
                hostname_mapping={hostname: ip})

        res = yield client.fetch(request, raise_error=False)
        if isinstance(res.error, OSError):
            if res.error.errno == errno.EMFILE:
                warning_msg = ("Too many open files. You should increase the"
                        " maximum allowed number of network connections in you"
                        " system or decrease the CONCURRENCY setting.")
                warnings.warn(warning_msg)
        if res.code == 200:
            return True
        else:
            return False
    finally:
        client.close()
开发者ID:kawing-chiu,项目名称:search_google_ip,代码行数:22,代码来源:search_google_ip.py


示例16: post

 def post(self):
     url = self.get_argument('url')
     http = SimpleAsyncHTTPClient()
     http.fetch(url, callback=self.on_fetch_complete)
开发者ID:parente,项目名称:handlerbag,代码行数:4,代码来源:urlfetch.py


示例17: SimpleHTTPClientTestCase

class SimpleHTTPClientTestCase(AsyncHTTPTestCase, LogTrapTestCase):
    def setUp(self):
        super(SimpleHTTPClientTestCase, self).setUp()
        self.http_client = SimpleAsyncHTTPClient(self.io_loop)

    def get_app(self):
        # callable objects to finish pending /trigger requests
        self.triggers = collections.deque()
        return Application([
            url("/trigger", TriggerHandler, dict(queue=self.triggers,
                                                 wake_callback=self.stop)),
            url("/chunk", ChunkHandler),
            url("/countdown/([0-9]+)", CountdownHandler, name="countdown"),
            url("/hang", HangHandler),
            url("/hello", HelloWorldHandler),
            url("/content_length", ContentLengthHandler),
            url("/head", HeadHandler),
            url("/no_content", NoContentHandler),
            url("/303_post", SeeOther303PostHandler),
            url("/303_get", SeeOther303GetHandler),
            ], gzip=True)

    def test_singleton(self):
        # Class "constructor" reuses objects on the same IOLoop
        self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is
                        SimpleAsyncHTTPClient(self.io_loop))
        # unless force_instance is used
        self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is not
                        SimpleAsyncHTTPClient(self.io_loop,
                                              force_instance=True))
        # different IOLoops use different objects
        io_loop2 = IOLoop()
        self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is not
                        SimpleAsyncHTTPClient(io_loop2))

    def test_connection_limit(self):
        client = SimpleAsyncHTTPClient(self.io_loop, max_clients=2,
                                       force_instance=True)
        self.assertEqual(client.max_clients, 2)
        seen = []
        # Send 4 requests.  Two can be sent immediately, while the others
        # will be queued
        for i in range(4):
            client.fetch(self.get_url("/trigger"),
                         lambda response, i=i: (seen.append(i), self.stop()))
        self.wait(condition=lambda: len(self.triggers) == 2)
        self.assertEqual(len(client.queue), 2)

        # Finish the first two requests and let the next two through
        self.triggers.popleft()()
        self.triggers.popleft()()
        self.wait(condition=lambda: (len(self.triggers) == 2 and
                                     len(seen) == 2))
        self.assertEqual(set(seen), set([0, 1]))
        self.assertEqual(len(client.queue), 0)

        # Finish all the pending requests
        self.triggers.popleft()()
        self.triggers.popleft()()
        self.wait(condition=lambda: len(seen) == 4)
        self.assertEqual(set(seen), set([0, 1, 2, 3]))
        self.assertEqual(len(self.triggers), 0)

    def test_redirect_connection_limit(self):
        # following redirects should not consume additional connections
        client = SimpleAsyncHTTPClient(self.io_loop, max_clients=1,
                                       force_instance=True)
        client.fetch(self.get_url('/countdown/3'), self.stop,
                     max_redirects=3)
        response = self.wait()
        response.rethrow()

    def test_default_certificates_exist(self):
        open(_DEFAULT_CA_CERTS).close()

    def test_gzip(self):
        # All the tests in this file should be using gzip, but this test
        # ensures that it is in fact getting compressed.
        # Setting Accept-Encoding manually bypasses the client's
        # decompression so we can see the raw data.
        response = self.fetch("/chunk", use_gzip=False,
                              headers={"Accept-Encoding": "gzip"})
        self.assertEqual(response.headers["Content-Encoding"], "gzip")
        self.assertNotEqual(response.body, b("asdfqwer"))
        # Our test data gets bigger when gzipped.  Oops.  :)
        self.assertEqual(len(response.body), 34)
        f = gzip.GzipFile(mode="r", fileobj=response.buffer)
        self.assertEqual(f.read(), b("asdfqwer"))

    def test_max_redirects(self):
        response = self.fetch("/countdown/5", max_redirects=3)
        self.assertEqual(302, response.code)
        # We requested 5, followed three redirects for 4, 3, 2, then the last
        # unfollowed redirect is to 1.
        self.assertTrue(response.request.url.endswith("/countdown/5"))
        self.assertTrue(response.effective_url.endswith("/countdown/2"))
        self.assertTrue(response.headers["Location"].endswith("/countdown/1"))

    def test_303_redirect(self):
       response = self.fetch("/303_post", method="POST", body="blah")
#.........这里部分代码省略.........
开发者ID:e1ven,项目名称:Waymoot,代码行数:101,代码来源:simple_httpclient_test.py


示例18: setUp

 def setUp(self):
     super(SimpleHTTPClientTestCase, self).setUp()
     # replace the client defined in the parent class
     self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
                                              force_instance=True)
开发者ID:AbdAllah-Ahmed,项目名称:tornado,代码行数:5,代码来源:simple_httpclient_test.py


示例19: SimpleHTTPClientTestCase

class SimpleHTTPClientTestCase(AsyncHTTPTestCase, LogTrapTestCase):
    def get_app(self):
        # callable objects to finish pending /trigger requests
        self.triggers = collections.deque()
        return Application([
            url("/hello", HelloWorldHandler),
            url("/post", PostHandler),
            url("/chunk", ChunkHandler),
            url("/auth", AuthHandler),
            url("/hang", HangHandler),
            url("/trigger", TriggerHandler, dict(queue=self.triggers,
                                                 wake_callback=self.stop)),
            url("/countdown/([0-9]+)", CountdownHandler, name="countdown"),
            ], gzip=True)

    def setUp(self):
        super(SimpleHTTPClientTestCase, self).setUp()
        # replace the client defined in the parent class
        self.http_client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
                                                 force_instance=True)

    def test_hello_world(self):
        response = self.fetch("/hello")
        self.assertEqual(response.code, 200)
        self.assertEqual(response.headers["Content-Type"], "text/plain")
        self.assertEqual(response.body, "Hello world!")

        response = self.fetch("/hello?name=Ben")
        self.assertEqual(response.body, "Hello Ben!")

    def test_streaming_callback(self):
        # streaming_callback is also tested in test_chunked
        chunks = []
        response = self.fetch("/hello",
                              streaming_callback=chunks.append)
        # with streaming_callback, data goes to the callback and not response.body
        self.assertEqual(chunks, ["Hello world!"])
        self.assertFalse(response.body)

    def test_post(self):
        response = self.fetch("/post", method="POST",
                              body="arg1=foo&arg2=bar")
        self.assertEqual(response.code, 200)
        self.assertEqual(response.body, "Post arg1: foo, arg2: bar")

    def test_chunked(self):
        response = self.fetch("/chunk")
        self.assertEqual(response.body, "asdfqwer")

        chunks = []
        response = self.fetch("/chunk",
                              streaming_callback=chunks.append)
        self.assertEqual(chunks, ["asdf", "qwer"])
        self.assertFalse(response.body)

    def test_basic_auth(self):
        self.assertEqual(self.fetch("/auth", auth_username="Aladdin",
                                    auth_password="open sesame").body,
                         "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==")

    def test_gzip(self):
        # All the tests in this file should be using gzip, but this test
        # ensures that it is in fact getting compressed.
        # Setting Accept-Encoding manually bypasses the client's
        # decompression so we can see the raw data.
        response = self.fetch("/chunk", use_gzip=False,
                              headers={"Accept-Encoding": "gzip"})
        self.assertEqual(response.headers["Content-Encoding"], "gzip")
        self.assertNotEqual(response.body, "asdfqwer")
        # Our test data gets bigger when gzipped.  Oops.  :)
        self.assertEqual(len(response.body), 34)
        f = gzip.GzipFile(mode="r", fileobj=response.buffer)
        self.assertEqual(f.read(), "asdfqwer")

    def test_connect_timeout(self):
        # create a socket and bind it to a port, but don't
        # call accept so the connection will timeout.
        #get_unused_port()
        port = get_unused_port()

        with closing(socket.socket()) as sock:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind(('', port))
            sock.listen(1)
            self.http_client.fetch("http://localhost:%d/" % port,
                                   self.stop,
                                   connect_timeout=0.1)
            response = self.wait()
            self.assertEqual(response.code, 599)
            self.assertEqual(str(response.error), "HTTP 599: Timeout")

    def test_request_timeout(self):
        response = self.fetch('/hang', request_timeout=0.1)
        self.assertEqual(response.code, 599)
        self.assertEqual(str(response.error), "HTTP 599: Timeout")

    def test_singleton(self):
        # Class "constructor" reuses objects on the same IOLoop
        self.assertTrue(SimpleAsyncHTTPClient(self.io_loop) is
                        SimpleAsyncHTTPClient(self.io_loop))
#.........这里部分代码省略.........
开发者ID:AbdAllah-Ahmed,项目名称:tornado,代码行数:101,代码来源:simple_httpclient_test.py


示例20: setUp

 def setUp(self):
     super(SimpleHTTPClientTestCase, self).setUp()
     self.http_client = SimpleAsyncHTTPClient(self.io_loop)
开发者ID:e1ven,项目名称:Waymoot,代码行数:3,代码来源:simple_httpclient_test.py



注:本文中的tornado.simple_httpclient.SimpleAsyncHTTPClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python stack_context.wrap函数代码示例发布时间:2022-05-27
下一篇:
Python queues.Queue类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap