• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testing.AsyncHTTPTestCase类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.testing.AsyncHTTPTestCase的典型用法代码示例。如果您正苦于以下问题:Python AsyncHTTPTestCase类的具体用法?Python AsyncHTTPTestCase怎么用?Python AsyncHTTPTestCase使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了AsyncHTTPTestCase类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: stop

 def stop(self, *args, **kwargs):
     # A stop() method more permissive about the number of its positional
     # arguments than AsyncHTTPTestCase.stop
     if len(args) == 1:
         AsyncHTTPTestCase.stop(self, args[0], **kwargs)
     else:
         AsyncHTTPTestCase.stop(self, args, **kwargs)
开发者ID:Taejun,项目名称:motor,代码行数:7,代码来源:test_motor_web.py


示例2: setUp

    def setUp(self):
        AsyncHTTPTestCase.setUp(self)

        # get sid
        response = self.fetch('/')

        self.sid = str(response.headers['Set-Cookie'].split(';')[0].split('=')[1].replace('"',''))
开发者ID:ruahman,项目名称:swipe-tech-apps,代码行数:7,代码来源:test_data.py


示例3: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
     self.tt_token = {'uid': '1', 'mobile': '18682212241'}
     token = create_signed_value(
         configs['cookie_secret'], 'token',
         token_encode(self.tt_token)
     )
     self.tt_headers = {'Cookie': 'token=' + token}
     dbc = hqlh.db.get_conn('fitter')
     dbc.execute('TRUNCATE user')
开发者ID:fitterQ,项目名称:fitterService,代码行数:10,代码来源:user_test.py


示例4: test_registering

    def test_registering(self):
        registry = PluginRegistry.getInstance("WebhookRegistry")
        hook = TestWebhook()
        registry.register_handler("application/vnd.gosa.test+plain", hook)
        url, token = registry.registerWebhook("admin", "test-webhook", "application/vnd.gosa.test+plain")

        token = bytes(token, 'ascii')
        signature_hash = hmac.new(token, msg=b"Test", digestmod="sha512")
        signature = 'sha1=' + signature_hash.hexdigest()
        headers = {
            'Content-Type': 'application/vnd.gosa.test+plain',
            'HTTP_X_HUB_SENDER': 'test-webhook',
            'HTTP_X_HUB_SIGNATURE': signature
        }
        with mock.patch.object(hook, "content") as m_content:
            AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=b"Test")
            m_content.assert_called_with(b"Test")
            m_content.reset_mock()

            registry.unregisterWebhook("admin", "test-webhook", "application/vnd.gosa.test+plain")
            AsyncHTTPTestCase.fetch(self, url, method="POST", headers=headers, body=b"Test")
            assert not m_content.called
开发者ID:gonicus,项目名称:gosa,代码行数:22,代码来源:test_registry.py


示例5: fetch

 def fetch(self, url, **kw):
     headers = kw.pop('headers', {})
     if self.__cookies != '':
         headers['Cookie'] = self.__cookies
     if self._xsrf:
         headers['X-XSRFToken'] = self._xsrf
         if len(headers['Cookie'])>0 and '_xsrf' not in headers['Cookie']:
             headers['Cookie'] = "%s;%s=%s" % (headers['Cookie'], '_xsrf', self._xsrf)
     # if 'body' in kw:
     #     print("URL: {}, Body: {}, Headers: {}".format(url, kw['body'] , headers))
     # else:
     #     print("URL: {}, Headers: {}".format(url, headers))
     resp = AsyncHTTPTestCase.fetch(self, url, headers=headers, **kw)
     self._update_cookies(resp.headers)
     return resp
开发者ID:gonicus,项目名称:gosa,代码行数:15,代码来源:RemoteTestCase.py


示例6: test_post

    def test_post(self):

        # create webhook post
        e = EventMaker()
        update = e.Event(
            e.BackendChange(
                e.DN("cn=Test,ou=people,dc=example,dc=net"),
                e.ModificationTime(datetime.now().strftime("%Y%m%d%H%M%SZ")),
                e.ChangeType("update")
            )
        )
        payload = etree.tostring(update)

        token = bytes(Environment.getInstance().config.get("webhooks.ldap_monitor_token"), 'ascii')
        signature_hash = hmac.new(token, msg=payload, digestmod="sha512")
        signature = 'sha1=' + signature_hash.hexdigest()

        headers = {
            'Content-Type': 'application/vnd.gosa.event+xml',
            'HTTP_X_HUB_SENDER': 'backend-monitor',
            'HTTP_X_HUB_SIGNATURE': signature
        }
        with mock.patch("gosa.backend.plugins.webhook.registry.zope.event.notify") as m_notify:
            AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=payload)
            assert m_notify.called
            m_notify.reset_mock()

            # unregistered sender
            headers['HTTP_X_HUB_SENDER'] = 'unknown'
            resp = AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=payload)
            assert resp.code == 401
            assert not m_notify.called

            # wrong signature
            headers['HTTP_X_HUB_SENDER'] = 'backend-monitor'
            headers['HTTP_X_HUB_SIGNATURE'] = 'sha1=823rjadfkjlasasddfdgasdfgasd'
            resp = AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=payload)
            assert resp.code == 401
            assert not m_notify.called

            # no signature
            del headers['HTTP_X_HUB_SIGNATURE']
            resp = AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=payload)
            assert resp.code == 401
            assert not m_notify.called

            # no handler for content type
            headers['HTTP_X_HUB_SIGNATURE'] = signature
            headers['Content-Type'] = 'application/vnd.gosa.unknown+xml'
            resp = AsyncHTTPTestCase.fetch(self, "/hooks/", method="POST", headers=headers, body=payload)
            assert resp.code == 401
            assert not m_notify.called
开发者ID:gonicus,项目名称:gosa,代码行数:52,代码来源:test_registry.py


示例7: setup_async_test_case

 def setup_async_test_case(self):
     AsyncHTTPTestCase.setUp(self)
开发者ID:bchess,项目名称:pushmanager,代码行数:2,代码来源:testservlet.py


示例8: setUp

 def setUp(self):
     self._registration_handler = mock()
     self._wakeup_handler = mock()
     self._wakeup_handler._tornado_facade = mock()
     AsyncHTTPTestCase.setUp(self)
开发者ID:Peter-Raafat,项目名称:c2dm-web-server,代码行数:5,代码来源:wakeup_mds_service_test.py


示例9: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
     self.db = mock.Mock()
     self.callback = mock.Mock()
     self.listdata = wordlist.WordListData('', self.db)
开发者ID:gmwils,项目名称:cihui,代码行数:5,代码来源:wordlist_data_test.py


示例10: test_provision_host

    async def test_provision_host(self, m_get, m_del, m_put, m_post):
        """ convert a discovered host to a 'real' host  """
        self._test_dn = GosaTestCase.create_test_data()
        container = ObjectProxy(self._test_dn, "IncomingDeviceContainer")
        container.commit()

        mocked_foreman = MockForeman()
        m_get.side_effect = mocked_foreman.get
        m_del.side_effect = mocked_foreman.delete
        m_put.side_effect = mocked_foreman.put
        m_post.side_effect = mocked_foreman.post

        # create the discovered host + foremanHostgroup
        d_host = ObjectProxy(container.dn, "Device")
        d_host.cn = "mac00262df16a2c"
        d_host.extend("ForemanHost")
        d_host.status = "discovered"
        d_host.extend("ieee802Device")
        d_host.macAddress = "00:26:2d:f1:6a:2c"
        d_host.extend("IpHost")
        d_host.ipHostNumber = "192.168.0.1"
        d_host.commit()

        hostgroup = ObjectProxy("%s" % self._test_dn, "GroupOfNames")
        hostgroup.extend("ForemanHostGroup")
        hostgroup.cn = "Test"
        hostgroup.foremanGroupId = "4"
        hostgroup.commit()

        # add host to group
        logging.getLogger("test.foreman-integration").info("########### START: Add Host to group ############# %s" % AsyncHTTPTestCase.get_url(self, "/hooks/"))
        d_host = ObjectProxy("cn=mac00262df16a2c,%s" % container.dn)

        def check():
            logging.getLogger("test.foreman-integration").info("check condition: %s, %s" % (d_host.cn, d_host.status))
            return d_host.cn == "mac00262df16a2c" and d_host.status == "discovered"

        def check2():
            logging.getLogger("test.foreman-integration").info("check2 condition: %s" % d_host.cn)
            return d_host.cn == "Testhost"

        base_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data")
        with open(os.path.join(base_dir, "discovered_hosts", "mac00262df16a2c.json")) as f:
            mocked_foreman.register_conditional_response("http://localhost:8000/api/v2/discovered_hosts/mac00262df16a2c",
                                                         "get",
                                                         check,
                                                         f.read())
        with open(os.path.join(base_dir, "conditional", "Testhost.json")) as f:
            mocked_foreman.register_conditional_response("http://localhost:8000/api/v2/hosts/Testhost",
                                                         "get",
                                                         check2,
                                                         f.read())

        def activate(**kwargs):
            return True

        mocked_foreman.register_trigger("http://localhost:8000/api/v2/discovered_hosts/mac00262df16a2c",
                                        "put",
                                        activate,
                                        self.execute)

        with make_session() as session:
            assert session.query(ObjectInfoIndex.dn)\
                       .join(ObjectInfoIndex.properties)\
                       .filter(and_(KeyValueIndex.key == "cn", KeyValueIndex.value == "Testhost"))\
                       .count() == 0

        d_host.cn = "Testhost"
        d_host.groupMembership = hostgroup.dn
        d_host.commit()
        logging.getLogger("test.foreman-integration").info("waiting for 2 seconds")
        await asyncio.sleep(2)

        logging.getLogger("test.foreman-integration").info("########### END: Add Host to group #############")

        # now move the host to the final destination
        d_host = ObjectProxy("cn=Testhost,ou=incoming,%s" % self._test_dn)
        assert d_host.status != "discovered"
        assert d_host.name == "Testhost"
        assert d_host.hostgroup_id == "4"
        assert d_host.is_extended_by("RegisteredDevice") is True
        assert len(d_host.userPassword[0]) > 0
        assert d_host.deviceUUID is not None

        with make_session() as session:
            assert session.query(ObjectInfoIndex.dn) \
                       .join(ObjectInfoIndex.properties) \
                       .filter(and_(KeyValueIndex.key == "cn", KeyValueIndex.value == "Testhost")) \
                       .count() == 1

        logging.getLogger("test.foreman-integration").info("########### START: moving host #############")
        d_host.move("%s" % self._test_dn)
        logging.getLogger("test.foreman-integration").info("########### END: moving host #############")

        # lets check if everything is fine in the database
        d_host = ObjectProxy("cn=Testhost,ou=devices,%s" % self._test_dn, read_only=True)
        assert d_host is not None
        assert d_host.status == "unknown"
        assert d_host.groupMembership == hostgroup.dn
开发者ID:gonicus,项目名称:gosa,代码行数:99,代码来源:test_integration.py


示例11: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
     reload(controller)
     controller.initialize(recreate=True)
开发者ID:dgquintas,项目名称:epics-caa,代码行数:4,代码来源:controller.py


示例12: fetch

 def fetch(self, path, **kwargs):
     response = AsyncHTTPTestCase.fetch(self, path, **kwargs)
     if response.body is not None:
         response.value = response.body.decode('utf-8')
     return response
开发者ID:censhin,项目名称:tornado-bootstrap,代码行数:5,代码来源:base.py


示例13: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
     dbc = hqlh.db.get_conn('fitter')
     dbc.execute("TRUNCATE user")
开发者ID:fitterQ,项目名称:fitterService,代码行数:4,代码来源:auth_test.py


示例14: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
开发者ID:dgquintas,项目名称:epics-caa,代码行数:2,代码来源:misc.py


示例15: setUp

 def setUp(self):
   self._publisher = PikaPublisher("fake_params", "fake_exchange", "fake_exchange_type", "fake_queue", "fake_routing_key")
   AsyncHTTPTestCase.setUp(self)
开发者ID:maheshgattani,项目名称:Tornado-RabbitMQ,代码行数:3,代码来源:publish_handler_test.py


示例16: __init__

 def __init__(self, *args, **kwargs):
     TestCase.__init__(self, *args, **kwargs)
     AsyncHTTPTestCase.__init__(self, *args, **kwargs)
开发者ID:MechanisM,项目名称:django-signalqueue,代码行数:3,代码来源:tests.py


示例17: setUp

 def setUp(self):
     self.db = tempfile.NamedTemporaryFile(delete=False)
     self.home = tempfile.mkdtemp()
     self.scanningdir = tempfile.mkdtemp()
     AsyncHTTPTestCase.setUp(self)
开发者ID:lpenz,项目名称:slickbird,代码行数:5,代码来源:base.py


示例18: setUp

 def setUp(self):
     AsyncHTTPTestCase.setUp(self)
     # Wait for the app to start up properly (for good luck).
     time.sleep(0.5)
开发者ID:imshashank,项目名称:data-mining,代码行数:4,代码来源:goodapp.py


示例19: tearDown

 def tearDown(self):
     AsyncHTTPTestCase.tearDown(self)
     controller.shutdown()
开发者ID:dgquintas,项目名称:epics-caa,代码行数:3,代码来源:controller.py


示例20: teardown_async_test_case

 def teardown_async_test_case(self):
     AsyncHTTPTestCase.tearDown(self)
开发者ID:bchess,项目名称:pushmanager,代码行数:2,代码来源:testservlet.py



注:本文中的tornado.testing.AsyncHTTPTestCase类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.b函数代码示例发布时间:2022-05-27
下一篇:
Python testing.AsyncHTTPSTestCase类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap