• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python outgoing.HTTPSOAPWrapper类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zato.server.connection.http_soap.outgoing.HTTPSOAPWrapper的典型用法代码示例。如果您正苦于以下问题:Python HTTPSOAPWrapper类的具体用法?Python HTTPSOAPWrapper怎么用?Python HTTPSOAPWrapper使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HTTPSOAPWrapper类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_create_headers_soap12

    def test_create_headers_soap12(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        soap_action = rand_string()
        requests_module = _FakeRequestsModule()

        config = self._get_config()
        config['data_format'] = DATA_FORMAT.XML
        config['transport'] = URL_TYPE.SOAP
        config['soap_action'] = soap_action
        config['soap_version'] = '1.2'

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())
        eq_(headers.pop('SOAPAction'), soap_action)
        eq_(headers.pop('Content-Type'), CONTENT_TYPE.SOAP12)

        # Anything that is left must be user headers
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:26,代码来源:test_outgoing.py


示例2: test_http_get_unknown_ca_verify_invalid_ca_cert

    def test_http_get_unknown_ca_verify_invalid_ca_cert(self):

        with NamedTemporaryFile(prefix='zato-tls', delete=False) as ca_cert_tf:

            ca_cert_tf.write(ca_cert_invalid)
            ca_cert_tf.flush()

            server = TLSServer()
            server.start()

            sleep(0.3)

            port = server.get_port()

            config = self._get_config()
            config['address_host'] = 'https://localhost:{}/'.format(port)
            config['address_url_path'] = ''
            config['ping_method'] = 'GET'
            config['transport'] = URL_TYPE.PLAIN_HTTP
            config['tls_verify'] = ca_cert_tf.name

            wrapper = HTTPSOAPWrapper(config, requests)

            try:
                wrapper.get('123')
            except Exception, e:
                details = e.message[0][1][0][0]
                self.assertEquals(details, ('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE', 'certificate verify failed'))
            else:
开发者ID:giacomocariello,项目名称:zato,代码行数:29,代码来源:test_outgoing.py


示例3: test_http_get_client_cert_required_no_client_cert

    def test_http_get_client_cert_required_no_client_cert(self):

        with NamedTemporaryFile(prefix='zato-tls', delete=False) as ca_cert_tf:

            ca_cert_tf.write(ca_cert)
            ca_cert_tf.flush()

            server = TLSServer(cert_reqs=ssl.CERT_REQUIRED)
            server.start()

            sleep(0.3)

            port = server.get_port()

            config = self._get_config()
            config['address_host'] = 'https://localhost:{}/'.format(port)
            config['address_url_path'] = ''
            config['ping_method'] = 'GET'
            config['transport'] = URL_TYPE.PLAIN_HTTP
            config['tls_verify'] = ca_cert_tf.name

            wrapper = HTTPSOAPWrapper(config, requests)

            try:
                wrapper.get('123')
            except Exception, e:
                details = e.message[0][1][0][0]
                self.assertEquals(details, ('SSL routines', 'SSL3_READ_BYTES', 'sslv3 alert handshake failure'))
            else:
开发者ID:giacomocariello,项目名称:zato,代码行数:29,代码来源:test_outgoing.py


示例4: test_ping_client_cert_required_no_client_cert

    def test_ping_client_cert_required_no_client_cert(self):

        with NamedTemporaryFile(prefix='zato-tls', delete=False) as ca_cert_tf:

            ca_cert_tf.write(ca_cert)
            ca_cert_tf.flush()

            server = TLSServer(cert_reqs=ssl.CERT_REQUIRED)
            server.start()

            sleep(0.3)

            port = server.get_port()

            config = self._get_config()
            config['address_host'] = 'https://localhost:{}/'.format(port)
            config['address_url_path'] = ''
            config['ping_method'] = 'GET'
            config['tls_verify'] = ca_cert_tf.name

            wrapper = HTTPSOAPWrapper(config, requests)

            try:
                wrapper.ping(rand_string())
            except Exception, e:
                self.assertIn('SSL3_READ_BYTES:sslv3 alert handshake failure', e.message[0][1])
            else:
开发者ID:farirat,项目名称:zato,代码行数:27,代码来源:test_outgoing.py


示例5: test_ping_unknown_ca_verify_invalid_ca_cert

    def test_ping_unknown_ca_verify_invalid_ca_cert(self):

        with NamedTemporaryFile(prefix='zato-tls', delete=False) as ca_cert_tf:

            ca_cert_tf.write(ca_cert_invalid)
            ca_cert_tf.flush()

            server = TLSServer()
            server.start()

            sleep(0.3)

            port = server.get_port()

            config = self._get_config()
            config['address_host'] = 'https://localhost:{}/'.format(port)
            config['address_url_path'] = ''
            config['ping_method'] = 'GET'
            config['tls_verify'] = ca_cert_tf.name

            wrapper = HTTPSOAPWrapper(config, requests)

            try:
                wrapper.ping(rand_string())
            except Exception, e:
                self.assertIn('SSL3_GET_SERVER_CERTIFICATE:certificate verify failed', e.message[0][1])
            else:
开发者ID:farirat,项目名称:zato,代码行数:27,代码来源:test_outgoing.py


示例6: test_ping_client_cert_required_has_client_cert

    def test_ping_client_cert_required_has_client_cert(self):

        with NamedTemporaryFile(prefix='zato-tls', delete=False) as ca_cert_tf:

            ca_cert_tf.write(ca_cert)
            ca_cert_tf.flush()

            with NamedTemporaryFile(prefix='zato-tls', delete=False) as client_cert_tf:

                client_cert_tf.write(client1_key)
                client_cert_tf.write('\n')
                client_cert_tf.write(client1_cert)
                client_cert_tf.flush()

                server = TLSServer(cert_reqs=ssl.CERT_REQUIRED)
                server.start()

                sleep(0.3)

                port = server.get_port()

                config = self._get_config()
                config['address_host'] = 'https://localhost:{}/'.format(port)
                config['address_url_path'] = ''
                config['ping_method'] = 'GET'
                config['tls_verify'] = ca_cert_tf.name
                config['tls_key_cert_full_path'] = client_cert_tf.name
                config['sec_type'] = SEC_DEF_TYPE.TLS_KEY_CERT

                wrapper = HTTPSOAPWrapper(config, requests)

                wrapper.ping(rand_string())
开发者ID:giacomocariello,项目名称:zato,代码行数:32,代码来源:test_outgoing.py


示例7: test_pool_size

    def test_pool_size(self):
        """ https://github.com/zatosource/zato/issues/77 (outconn HTTP/SOAP pool size)
        """
        config = self._get_config()
        expected_pool_size = rand_int()
        config['pool_size'] = expected_pool_size
        requests_module = _FakeRequestsModule()

        wrapper = HTTPSOAPWrapper(config, requests_module)
        wrapper.ping(rand_string())

        eq_(expected_pool_size, requests_module.session_obj.pool_size)
开发者ID:giacomocariello,项目名称:zato,代码行数:12,代码来源:test_outgoing.py


示例8: test_pool_size

 def test_pool_size(self):
     """ https://github.com/zatosource/zato/issues/77 (outconn HTTP/SOAP pool size)
     """
     expected_pool_size = rand_int()
     requests_module = _FakeRequestsModule()
     
     config = {'sec_type':rand_string(), 'address':rand_string(), 
               'ping_method':rand_string(), 'pool_size':expected_pool_size}
     
     wrapper = HTTPSOAPWrapper(config, requests_module)
     wrapper.ping(rand_string())
     
     eq_(expected_pool_size, requests_module.session_obj.pool_size)
开发者ID:Adniel,项目名称:zato,代码行数:13,代码来源:test_outgoing.py


示例9: test_ping_method

    def test_ping_method(self):
        """ https://github.com/zatosource/zato/issues/44 (outconn HTTP/SOAP ping method)
        """
        config = self._get_config()
        expected_ping_method = 'ping-{}'.format(rand_string())
        config['ping_method'] = expected_ping_method
        requests_module = _FakeRequestsModule()

        wrapper = HTTPSOAPWrapper(config, requests_module)
        wrapper.ping(rand_string())

        ping_method = requests_module.session_obj.request_args[0]
        eq_(expected_ping_method, ping_method)
开发者ID:giacomocariello,项目名称:zato,代码行数:13,代码来源:test_outgoing.py


示例10: test_ping_method

 def test_ping_method(self):
     """ https://github.com/zatosource/zato/issues/44 (outconn HTTP/SOAP ping method)
     """
     expected_ping_method = 'ping-{}'.format(rand_string())
     requests_module = _FakeRequestsModule()
     
     config = {'sec_type':rand_string(), 'address':rand_string(), 
               'ping_method':expected_ping_method, 'pool_size':rand_int()}
     
     wrapper = HTTPSOAPWrapper(config, requests_module)
     wrapper.ping(rand_string())
     
     ping_method = requests_module.session_obj.request_args[0]
     eq_(expected_ping_method, ping_method)
开发者ID:Adniel,项目名称:zato,代码行数:14,代码来源:test_outgoing.py


示例11: test_format_address

    def test_format_address(self):
        cid = rand_string()
        address_host = rand_string()
        address_url_path = '/a/{a}/b{b}/c-{c}/{d}d/'

        config = self._get_config()
        config['address_host'] = address_host
        config['address_url_path'] = address_url_path

        requests_module = _FakeRequestsModule()
        wrapper = HTTPSOAPWrapper(config, requests_module)

        try:
            wrapper.format_address(cid, None)
        except ValueError, e:
            eq_(e.message, 'CID:[{}] No parameters given for URL path'.format(cid))
开发者ID:giacomocariello,项目名称:zato,代码行数:16,代码来源:test_outgoing.py


示例12: test_soap_body

    def test_soap_body(self):
        """ https://github.com/zatosource/zato/issues/475 (Body ignored in string-based outgoing SOAP connections)
        """
        config = self._get_config()
        config['transport'] = URL_TYPE.SOAP

        requests_module = _FakeRequestsModule()
        wrapper = HTTPSOAPWrapper(config, requests_module)

        body_text = rand_string()

        wrapper.post(rand_int(), body_text)

        data = etree.fromstring(wrapper.requests_module.session_obj.request_kwargs['data'])
        body = data.xpath('//*[local-name() = "Body"]')[0]
        self.assertEquals(body.text, body_text)
开发者ID:giacomocariello,项目名称:zato,代码行数:16,代码来源:test_outgoing.py


示例13: test_ping_unknown_ca_verify_false

    def test_ping_unknown_ca_verify_false(self):
        server = TLSServer()
        server.start()

        sleep(0.3)

        port = server.get_port()

        config = self._get_config()
        config['address_host'] = 'https://localhost:{}/'.format(port)
        config['address_url_path'] = ''
        config['ping_method'] = 'GET'
        config['tls_verify'] = ZATO_NONE

        wrapper = HTTPSOAPWrapper(config, requests)

        self.assertIn('Code: 200', wrapper.ping(rand_string()))
开发者ID:giacomocariello,项目名称:zato,代码行数:17,代码来源:test_outgoing.py


示例14: test_http_get_unknown_ca_verify_false

    def test_http_get_unknown_ca_verify_false(self):
        server = TLSServer()
        server.start()

        sleep(0.3)

        port = server.get_port()

        config = self._get_config()
        config['address_host'] = 'https://localhost:{}/'.format(port)
        config['address_url_path'] = ''
        config['ping_method'] = 'GET'
        config['transport'] = URL_TYPE.PLAIN_HTTP
        config['tls_verify'] = ZATO_NONE

        wrapper = HTTPSOAPWrapper(config, requests)

        self.assertEquals(httplib.OK, wrapper.get('123').status_code)
开发者ID:giacomocariello,项目名称:zato,代码行数:18,代码来源:test_outgoing.py


示例15: test_create_headers_no_data_format

    def test_create_headers_no_data_format(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        requests_module = _FakeRequestsModule()

        config = self._get_config()

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())

        # Anything that is left must be user headers
        # (note that there was no Content-Type because there was no data_format)
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:20,代码来源:test_outgoing.py


示例16: test_create_headers_json

    def test_create_headers_json(self):

        cid = rand_string()
        now = datetime.utcnow().isoformat()
        requests_module = _FakeRequestsModule()

        config = self._get_config()
        config['data_format'] = DATA_FORMAT.JSON
        config['transport'] = URL_TYPE.PLAIN_HTTP

        wrapper = HTTPSOAPWrapper(config, requests_module)
        user_headers = {rand_string():rand_string(), rand_string():rand_string()}

        headers = wrapper._create_headers(cid, user_headers, now)

        eq_(headers.pop('X-Zato-CID'), cid)
        eq_(headers.pop('X-Zato-Msg-TS'), now)
        eq_(headers.pop('X-Zato-Component'), get_component_name())
        eq_(headers.pop('Content-Type'), CONTENT_TYPE.JSON)

        # Anything that is left must be user headers
        self.assertDictEqual(headers, user_headers)
开发者ID:giacomocariello,项目名称:zato,代码行数:22,代码来源:test_outgoing.py


示例17: test_http_methods

    def test_http_methods(self):
        address_host = rand_string()

        config = self._get_config()
        config['is_active'] = True
        config['soap_version'] = '1.2'
        config['address_host'] = address_host

        requests_module = _FakeRequestsModule()
        wrapper = HTTPSOAPWrapper(config, requests_module)

        for address_url_path in('/zzz', '/a/{a}/b/{b}'):
            for transport in('soap', rand_string()):
                for name in('get', 'delete', 'options', 'post', 'put', 'patch'):
                    config['transport'] = transport

                    _cid = rand_string()
                    _data = rand_string()

                    expected_http_request_value = rand_string()
                    expected_http_request_value = rand_string()

                    expected_params = rand_string()

                    expected_args1 = rand_string()
                    expected_args2 = rand_string()

                    expected_kwargs1 = rand_string()
                    expected_kwargs2 = rand_string()

                    def http_request(method, cid, data='', params=None, *args, **kwargs):

                        eq_(method, name.upper())
                        eq_(cid, _cid)

                        if name in('get', 'delete', 'options'):
                            eq_(data, '')
                        else:
                            eq_(data, _data)

                        eq_(params, expected_params)
                        eq_(args, (expected_args1, expected_args2))
                        eq_(sorted(kwargs.items()), [('bar', expected_kwargs2), ('foo', expected_kwargs1)])

                        return expected_http_request_value

                    def format_address(cid, params):
                        return expected_http_request_value

                    wrapper.http_request = http_request
                    wrapper.format_address = format_address

                    func = getattr(wrapper, name)

                    if name in('get', 'delete', 'options'):
                        http_request_value = func(
                            _cid, expected_params, expected_args1, expected_args2,
                            foo=expected_kwargs1, bar=expected_kwargs2)
                    else:
                        http_request_value = func(
                            _cid, _data, expected_params, expected_args1, expected_args2,
                            foo=expected_kwargs1, bar=expected_kwargs2)

                    eq_(http_request_value, expected_http_request_value)
开发者ID:giacomocariello,项目名称:zato,代码行数:64,代码来源:test_outgoing.py



注:本文中的zato.server.connection.http_soap.outgoing.HTTPSOAPWrapper类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rbac_.RBAC类代码示例发布时间:2022-05-26
下一篇:
Python parallel.ParallelServer类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap