• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.MockHttpServer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中vumi.tests.utils.MockHttpServer的典型用法代码示例。如果您正苦于以下问题:Python MockHttpServer类的具体用法?Python MockHttpServer怎么用?Python MockHttpServer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MockHttpServer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: TestYoPaymentHandler

class TestYoPaymentHandler(VumiTestCase):

    @inlineCallbacks
    def setUp(self):
        self.eh_helper = self.add_helper(EventHandlerHelper())
        yield self.eh_helper.setup_event_dispatcher(
            'afropinions', YoPaymentHandler, {
                'poll_manager_prefix': 'vumigo.',
                'username': 'username',
                'password': 'password',
                'url': None,
                'method': 'POST',
                'amount': 1000,
                'reason': 'foo',
            })
        self.eh_helper.track_event('survey_completed', 'afropinions')

    @inlineCallbacks
    def test_hitting_url(self):
        msisdn = u'+2345'
        message_id = u'message-id'
        event = self.eh_helper.make_event('survey_completed', {
            'from_addr': msisdn,
            'message_id': message_id,
            'transport_type': 'ussd',
            'participant': {'interactions': 2},
        })

        self.mock_server = MockHttpServer()
        yield self.mock_server.start()

        self.eh_helper.get_handler('afropinions').url = self.mock_server.url

        yield self.eh_helper.dispatch_event(event)
        received_request = yield self.mock_server.queue.get()
        self.assertEqual(received_request.args['msisdn'][0], msisdn)
        self.assertEqual(received_request.args['amount'][0], '2000')
        self.assertEqual(received_request.args['reason'][0], 'foo')

        headers = received_request.requestHeaders
        self.assertEqual(headers.getRawHeaders('Content-Type'),
            ['application/x-www-form-urlencoded'])
        self.assertEqual(headers.getRawHeaders('Authorization'),
            ['Basic %s' % (base64.b64encode('username:password'),)])
        yield self.mock_server.stop()

    def test_auth_headers(self):
        handler = self.eh_helper.get_handler('afropinions')
        auth = handler.get_auth_headers('username', 'password')
        credentials = base64.b64encode('username:password')
        self.assertEqual(auth, {
            'Authorization': 'Basic %s' % (credentials.strip(),)
            })
开发者ID:ChrisNolan1992,项目名称:vumi-go,代码行数:53,代码来源:test_handlers.py


示例2: TestTransport

class TestTransport(TestCase):

    @inlineCallbacks
    def setUp(self):
        DelayedCall.debug = True
        self.ok_transport_calls = DeferredQueue()
        self.mock_service = MockHttpServer(self.handle_request)
        yield self.mock_service.start()
        config = {
            'transport_name': 'test_ok_transport',
            'transport_type': 'ok',
            'ussd_string_prefix': '',
            'web_path': "foo",
            'web_port': 0,
            'url': self.mock_service.url,
            'username': 'testuser',
            'password': 'testpass',
            }
        self.worker = get_stubbed_worker(OkTransport, config)
        self.broker = self.worker._amqp_client.broker
        yield self.worker.startWorker()
        self.worker_url = self.worker.get_transport_url()

    @inlineCallbacks
    def tearDown(self):
        yield self.worker.stopWorker()
        yield self.mock_service.stop()

    def handle_request(self, request):
        self.ok_transport_calls.put(request)
        return ''

    @inlineCallbacks
    def test_health(self):
        result = yield http_request(self.worker_url + "health", "",
                                    method='GET')
        self.assertEqual(json.loads(result), {
            'pending_requests': 0
        })

    @inlineCallbacks
    def test_inbound(self):
        d = http_request(self.worker_url + "foo", '', method='GET')
        msg, = yield self.broker.wait_messages("vumi",
            "test_ok_transport.inbound", 1)
        payload = msg.payload
        tum = TransportUserMessage(**payload)
        rep = tum.reply("OK")
        self.broker.publish_message("vumi", "test_ok_transport.outbound",
                rep)
        response = yield d
        self.assertEqual(response, 'OK')
开发者ID:BantouTelecom,项目名称:vumi,代码行数:52,代码来源:test_httprpc.py


示例3: setUp

    def setUp(self):
        self.mediaedgegsm_calls = DeferredQueue()
        self.mock_mediaedgegsm = MockHttpServer(self.handle_request)
        self.add_cleanup(self.mock_mediaedgegsm.stop)
        yield self.mock_mediaedgegsm.start()

        self.config = {
            'web_path': "foo",
            'web_port': 0,
            'username': 'user',
            'password': 'pass',
            'outbound_url': self.mock_mediaedgegsm.url,
            'outbound_username': 'username',
            'outbound_password': 'password',
            'operator_mappings': {
                '417': {
                    '417912': 'VODA',
                    '417913': 'TIGO',
                    '417914': 'UNKNOWN',
                }
            }
        }
        self.tx_helper = self.add_helper(
            TransportHelper(MediaEdgeGSMTransport))
        self.transport = yield self.tx_helper.get_transport(self.config)
        self.transport_url = self.transport.get_transport_url()
        self.mediaedgegsm_response = ''
        self.mediaedgegsm_response_code = http.OK
开发者ID:TouK,项目名称:vumi,代码行数:28,代码来源:test_mediaedgegsm.py


示例4: start_app_worker

    def start_app_worker(self, config_overrides={}):
        # Mock server to test HTTP posting of inbound messages & events
        self.mock_push_server = MockHttpServer(self.handle_request)
        yield self.mock_push_server.start()
        self.add_cleanup(self.mock_push_server.stop)
        self.push_calls = DeferredQueue()

        self.config = {
            'conversation_key': 'key_conversation',
            'push_message_url': self.get_message_url(),
            'push_event_url': self.get_event_url(),
            'health_path': '/health/',
            'web_path': '/foo',
            'web_port': 0,
            'api_tokens': [{
                'account': 'account_key',
                'conversation': 'key_conversation',
                'tokens': ['token-1', 'token-2', 'token-3'],
            }, ]
        }
        self.config.update(config_overrides)
        self.app = yield self.app_helper.get_application(self.config)
        self.conversation = self.config['conversation_key']
        self.addr = self.app.webserver.getHost()
        self.url = 'http://%s:%s%s' % (
            self.addr.host, self.addr.port, self.config['web_path'])
        self.auth_headers = {
            'Authorization': ['Basic ' + base64.b64encode('%s:%s' % (
                'account_key', 'token-1'))],
        }
开发者ID:praekelt,项目名称:vumi-http-api,代码行数:30,代码来源:test_vumi_app.py


示例5: setUp

    def setUp(self):
        self.cellulant_sms_calls = DeferredQueue()
        self.mock_cellulant_sms = MockHttpServer(self.handle_request)
        yield self.mock_cellulant_sms.start()
        self.add_cleanup(self.mock_cellulant_sms.stop)

        self.config = {
            'web_path': "foo",
            'web_port': 0,
            'credentials': {
                '2371234567': {
                    'username': 'user',
                    'password': 'pass',
                },
                '9292': {
                    'username': 'other-user',
                    'password': 'other-pass',
                }
            },
            'outbound_url': self.mock_cellulant_sms.url,
            'validation_mode': 'permissive',
        }
        self.tx_helper = self.add_helper(
            TransportHelper(CellulantSmsTransport))
        self.transport = yield self.tx_helper.get_transport(self.config)
        self.transport_url = self.transport.get_transport_url()
开发者ID:Nagato23,项目名称:vumi,代码行数:26,代码来源:test_cellulant_sms.py


示例6: setUp

    def setUp(self):
        self.mock_server = MockHttpServer(self.handle_inbound_request)
        self.outbound_requests = DeferredQueue()
        self.mock_server_response = ''
        self.mock_server_response_code = http.OK
        yield self.mock_server.start()
        self.add_cleanup(self.mock_server.stop)

        config = {
            'web_path': 'api/v1/apposit/sms',
            'web_port': 0,
            'credentials': {
                '8123': {
                    'username': 'root',
                    'password': 'toor',
                    'service_id': 'service-id-1',
                },
                '8124': {
                    'username': 'admin',
                    'password': 'nimda',
                    'service_id': 'service-id-2',
                }
            },
            'outbound_url': self.mock_server.url,
        }
        self.tx_helper = self.add_helper(
            TransportHelper(
                AppositTransport, transport_addr='8123',
                mobile_addr='251911223344'))
        self.transport = yield self.tx_helper.get_transport(config)
        self.transport_url = self.transport.get_transport_url()
        self.web_path = config['web_path']
开发者ID:Nagato23,项目名称:vumi,代码行数:32,代码来源:test_apposit.py


示例7: setUp

    def setUp(self):
        self.app_helper = self.add_helper(
            AppWorkerHelper(NoStreamingHTTPWorker))

        self.config = {
            'health_path': '/health/',
            'web_path': '/foo',
            'web_port': 0,
            'metrics_prefix': 'metrics_prefix.',
        }
        self.app = yield self.app_helper.get_app_worker(self.config)
        self.addr = self.app.webserver.getHost()
        self.url = 'http://%s:%s%s' % (
            self.addr.host, self.addr.port, self.config['web_path'])

        # Mock server to test HTTP posting of inbound messages & events
        self.mock_push_server = MockHttpServer(self.handle_request)
        yield self.mock_push_server.start()
        self.add_cleanup(self.mock_push_server.stop)
        self.push_calls = DeferredQueue()

        self.conversation = yield self.create_conversation(
            self.get_message_url(), self.get_event_url(),
            ['token-1', 'token-2', 'token-3'])

        self.auth_headers = {
            'Authorization': ['Basic ' + base64.b64encode('%s:%s' % (
                self.conversation.user_account.key, 'token-1'))],
        }

        self._setup_wait_for_request()
        self.add_cleanup(self._wait_for_requests)
开发者ID:ChrisNolan1992,项目名称:vumi-go,代码行数:32,代码来源:test_vumi_app.py


示例8: WeChatTestCase

class WeChatTestCase(VumiTestCase):

    def setUp(self):
        self.tx_helper = self.add_helper(TransportHelper(WeChatTransport))
        self.request_queue = DeferredQueue()
        self.mock_server = MockHttpServer(self.handle_api_request)
        self.add_cleanup(self.mock_server.stop)
        return self.mock_server.start()

    def handle_api_request(self, request):
        self.request_queue.put(request)
        return NOT_DONE_YET

    def get_transport(self, **config):
        defaults = {
            'api_url': self.mock_server.url,
            'auth_token': 'token',
            'twisted_endpoint': 'tcp:0',
            'wechat_appid': 'appid',
            'wechat_secret': 'secret',
            'embed_user_profile': False,
        }
        defaults.update(config)
        return self.tx_helper.get_transport(defaults)

    @inlineCallbacks
    def get_transport_with_access_token(self, access_token, **config):
        transport = yield self.get_transport(**config)
        yield transport.redis.set(WeChatTransport.ACCESS_TOKEN_KEY,
                                  access_token)
        returnValue(transport)
开发者ID:Nagato23,项目名称:vumi,代码行数:31,代码来源:test_wechat.py


示例9: test_hitting_url

    def test_hitting_url(self):
        msisdn = u'+2345'
        message_id = u'message-id'
        event = self.eh_helper.make_event('survey_completed', {
            'from_addr': msisdn,
            'message_id': message_id,
            'transport_type': 'ussd',
            'participant': {'interactions': 2},
        })

        self.mock_server = MockHttpServer()
        yield self.mock_server.start()

        self.eh_helper.get_handler('afropinions').url = self.mock_server.url

        yield self.eh_helper.dispatch_event(event)
        received_request = yield self.mock_server.queue.get()
        self.assertEqual(received_request.args['msisdn'][0], msisdn)
        self.assertEqual(received_request.args['amount'][0], '2000')
        self.assertEqual(received_request.args['reason'][0], 'foo')

        headers = received_request.requestHeaders
        self.assertEqual(headers.getRawHeaders('Content-Type'),
            ['application/x-www-form-urlencoded'])
        self.assertEqual(headers.getRawHeaders('Authorization'),
            ['Basic %s' % (base64.b64encode('username:password'),)])
        yield self.mock_server.stop()
开发者ID:ChrisNolan1992,项目名称:vumi-go,代码行数:27,代码来源:test_handlers.py


示例10: setup_resource

 def setup_resource(self, callback=None, auth=None, config=None):
     if callback is None:
         callback = lambda r: self.fail("No RapidSMS requests expected.")
     self.mock_server = MockHttpServer(callback)
     self.add_cleanup(self.mock_server.stop)
     yield self.mock_server.start()
     url = '%s%s' % (self.mock_server.url, '/test/resource/path')
     self.app = yield self.setup_app(url, auth=auth, config=config)
开发者ID:Nagato23,项目名称:vumi,代码行数:8,代码来源:test_rapidsms_relay.py


示例11: setUp

    def setUp(self):
        self.tx_helper = self.add_helper(TransportHelper(self.transport_class))
        self.mock_server = MockHttpServer(self.handle_inbound_request)
        self.add_cleanup(self.mock_server.stop)
        self.clock = Clock()

        yield self.mock_server.start()

        self._pending_reqs = []
        self.add_cleanup(self.finish_requests)
开发者ID:Nagato23,项目名称:vumi,代码行数:10,代码来源:test_vumi_bridge.py


示例12: mk_mock_server

    def mk_mock_server(self, body, headers=None, code=http.OK):
        if headers is None:
            headers = {'X-Nth-Smsid': 'message_id'}

        def handler(request):
            request.setResponseCode(code)
            for k, v in headers.items():
                request.setHeader(k, v)
            return body

        self.mock_server = MockHttpServer(handler)
        self.add_cleanup(self.mock_server.stop)
        yield self.mock_server.start()
        self.worker.config['url'] = self.mock_server.url
开发者ID:Nagato23,项目名称:vumi,代码行数:14,代码来源:test_failures.py


示例13: setUp

    def setUp(self):
        self.clock = Clock()
        self.patch(Vas2NetsSmsTransport, 'get_clock', lambda _: self.clock)

        self.remote_request_handler = lambda _: 'OK.1234'
        self.remote_server = MockHttpServer(self.remote_handle_request)
        yield self.remote_server.start()
        self.addCleanup(self.remote_server.stop)

        self.tx_helper = self.add_helper(
            HttpRpcTransportHelper(Vas2NetsSmsTransport))

        connection_pool = HTTPConnectionPool(reactor, persistent=False)
        treq._utils.set_global_pool(connection_pool)
开发者ID:praekelt,项目名称:vumi-vas2nets,代码行数:14,代码来源:test_vas2nets_sms.py


示例14: TestStreamingClient

class TestStreamingClient(VumiTestCase):

    @inlineCallbacks
    def setUp(self):
        self.mock_server = MockHttpServer(self.handle_request)
        self.add_cleanup(self.mock_server.stop)
        yield self.mock_server.start()
        self.url = self.mock_server.url
        self.client = StreamingClient()
        self.messages_received = DeferredQueue()
        self.errors_received = DeferredQueue()
        self.disconnects_received = DeferredQueue()

        def reason_trapper(reason):
            if reason.trap(ResponseDone):
                self.disconnects_received.put(reason.getErrorMessage())

        self.receiver = self.client.stream(
            Message,
            self.messages_received.put, self.errors_received.put,
            self.url, on_disconnect=reason_trapper)

    def handle_request(self, request):
        self.mock_server.queue.put(request)
        return NOT_DONE_YET

    @inlineCallbacks
    def test_callback_on_disconnect(self):
        req = yield self.mock_server.queue.get()
        req.write(
            '%s\n' % (Message(foo='bar').to_json().encode('utf-8'),))
        req.finish()
        message = yield self.messages_received.get()
        self.assertEqual(message['foo'], 'bar')
        reason = yield self.disconnects_received.get()
        # this is the error message we get when a ResponseDone is raised
        # which happens when the remote server closes the connection.
        self.assertEqual(reason, 'Response body fully received')

    @inlineCallbacks
    def test_invalid_json(self):
        req = yield self.mock_server.queue.get()
        req.write("Hello\n")
        req.finish()
        try:
            yield self.errors_received.get()
        except VumiBridgeInvalidJsonError, e:
            self.assertEqual(e.args, ("Hello",))
        else:
开发者ID:Nagato23,项目名称:vumi,代码行数:49,代码来源:test_client.py


示例15: setUp

    def setUp(self):
        self.clock = Clock()

        MessengerTransport.clock = self.clock

        self.remote_server = MockHttpServer(lambda _: 'OK')
        yield self.remote_server.start()
        self.addCleanup(self.remote_server.stop)

        self.tx_helper = self.add_helper(
            HttpRpcTransportHelper(PatchedMessengerTransport))
        self.msg_helper = self.add_helper(MessageHelper())

        connection_pool = HTTPConnectionPool(reactor, persistent=False)
        treq._utils.set_global_pool(connection_pool)
开发者ID:praekeltfoundation,项目名称:vumi-messenger,代码行数:15,代码来源:test_transport.py


示例16: setUp

    def setUp(self):
        yield super(SimagriTransportTestCase, self).setUp()

        self.simagri_sms_calls = DeferredQueue()
        self.mock_simagri_sms = MockHttpServer(self.handle_request)
        yield self.mock_simagri_sms.start()        
        
        self.config = {
            'transport_name': self.transport_name,
            'receive_path': 'sendsms',
            'receive_port': 9998,
            'outbound_url': self.mock_simagri_sms.url
            }
        self.transport = yield self.get_transport(self.config)
        self.transport_url = self.transport.get_transport_url()
开发者ID:hacklabcbba,项目名称:vumi-simagri,代码行数:15,代码来源:test_simagri_http.py


示例17: TestGoConversationTransportBase

class TestGoConversationTransportBase(VumiTestCase):

    transport_class = None

    @inlineCallbacks
    def setUp(self):
        self.tx_helper = self.add_helper(TransportHelper(self.transport_class))
        self.mock_server = MockHttpServer(self.handle_inbound_request)
        self.add_cleanup(self.mock_server.stop)
        self.clock = Clock()

        yield self.mock_server.start()

        self._pending_reqs = []
        self.add_cleanup(self.finish_requests)

    @inlineCallbacks
    def get_transport(self, **config):
        defaults = {
            'base_url': self.mock_server.url,
            'account_key': 'account-key',
            'conversation_key': 'conversation-key',
            'access_token': 'access-token',
        }
        defaults.update(config)
        transport = yield self.tx_helper.get_transport(defaults)
        yield self.setup_transport(transport)
        returnValue(transport)

    def setup_transport(self, transport):
        pass

    @inlineCallbacks
    def finish_requests(self):
        for req in self._pending_reqs:
            if not req.finished:
                yield req.finish()

    def handle_inbound_request(self, request):
        self.mock_server.queue.put(request)
        return NOT_DONE_YET

    @inlineCallbacks
    def get_next_request(self):
        req = yield self.mock_server.queue.get()
        self._pending_reqs.append(req)
        returnValue(req)
开发者ID:Nagato23,项目名称:vumi,代码行数:47,代码来源:test_vumi_bridge.py


示例18: setUp

 def setUp(self):
     self.integrat_calls = DeferredQueue()
     self.mock_integrat = MockHttpServer(self.handle_request)
     self.add_cleanup(self.mock_integrat.stop)
     yield self.mock_integrat.start()
     config = {
         'web_path': "foo",
         'web_port': "0",
         'url': self.mock_integrat.url,
         'username': 'testuser',
         'password': 'testpass',
         }
     self.tx_helper = self.add_helper(TransportHelper(IntegratTransport))
     self.transport = yield self.tx_helper.get_transport(config)
     addr = self.transport.web_resource.getHost()
     self.transport_url = "http://%s:%s/" % (addr.host, addr.port)
     self.higate_response = '<Response status_code="0"/>'
开发者ID:Nagato23,项目名称:vumi,代码行数:17,代码来源:test_integrat.py


示例19: setUp

 def setUp(self):
     self.integrat_calls = DeferredQueue()
     self.mock_integrat = MockHttpServer(self.handle_request)
     yield self.mock_integrat.start()
     config = {
         'transport_name': 'testgrat',
         'web_path': "foo",
         'web_port': "0",
         'url': self.mock_integrat.url,
         'username': 'testuser',
         'password': 'testpass',
         }
     self.worker = get_stubbed_worker(IntegratTransport, config)
     self.broker = self.worker._amqp_client.broker
     yield self.worker.startWorker()
     addr = self.worker.web_resource.getHost()
     self.worker_url = "http://%s:%s/" % (addr.host, addr.port)
开发者ID:mangroveorg,项目名称:vumi,代码行数:17,代码来源:test_integrat.py


示例20: setUp

    def setUp(self):
        self.mediafone_calls = DeferredQueue()
        self.mock_mediafone = MockHttpServer(self.handle_request)
        yield self.mock_mediafone.start()
        self.add_cleanup(self.mock_mediafone.stop)

        self.config = {
            'web_path': "foo",
            'web_port': 0,
            'username': 'user',
            'password': 'pass',
            'outbound_url': self.mock_mediafone.url,
        }
        self.tx_helper = self.add_helper(TransportHelper(MediafoneTransport))
        self.transport = yield self.tx_helper.get_transport(self.config)
        self.transport_url = self.transport.get_transport_url()
        self.mediafonemc_response = ''
        self.mediafonemc_response_code = http.OK
开发者ID:Nagato23,项目名称:vumi,代码行数:18,代码来源:test_mediafonemc.py



注:本文中的vumi.tests.utils.MockHttpServer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.mk_packet函数代码示例发布时间:2022-05-26
下一篇:
Python utils.get_stubbed_worker函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap