• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python server.ServerBase类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中spyne.server.ServerBase的典型用法代码示例。如果您正苦于以下问题:Python ServerBase类的具体用法?Python ServerBase怎么用?Python ServerBase使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ServerBase类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_soap_input_header

    def test_soap_input_header(self):
        server = ServerBase(self.app)
        initial_ctx = MethodContext(server)
        initial_ctx.in_string = [
            '<senv:Envelope xmlns:tns="tns"'
                'xmlns:wsa="http://www.w3.org/2005/08/addressing"'
                'xmlns:senv="http://schemas.xmlsoap.org/soap/envelope/">'
                '<senv:Header>'
                    '<wsa:Action>/SomeAction</wsa:Action>'
                    '<wsa:MessageID>SomeMessageID</wsa:MessageID>'
                    '<wsa:RelatesTo>SomeRelatesToID</wsa:RelatesTo>'
                '</senv:Header>'
                '<senv:Body>'
                    '<tns:someRequest>'
                        '<tns:status>OK</tns:status>'
                    '</tns:someRequest>'
                '</senv:Body>'
                '</senv:Envelope>'
        ]

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)

        self.assertEquals(ctx.in_header[0], '/SomeAction')
        self.assertEquals(ctx.in_header[1], 'SomeMessageID')
        self.assertEquals(ctx.in_header[2], 'SomeRelatesToID')
开发者ID:colons,项目名称:spyne,代码行数:26,代码来源:test_soap.py


示例2: test_soap_input_header_order_and_missing

    def test_soap_input_header_order_and_missing(self):
        """
        Test that header ordering logic also works when an input header
        element is missing.  Confirm that it returns None for the missing
        parameter.
        """
        server = ServerBase(self.app)
        initial_ctx = MethodContext(server)
        initial_ctx.in_string = [
            '<senv:Envelope xmlns:tns="tns"'
                'xmlns:wsa="http://www.w3.org/2005/08/addressing"'
                'xmlns:senv="http://schemas.xmlsoap.org/soap/envelope/">'
                '<senv:Header>'
                    '<wsa:MessageID>SomeMessageID</wsa:MessageID>'
                    '<wsa:Action>/SomeAction</wsa:Action>'
                '</senv:Header>'
                '<senv:Body>'
                    '<tns:someRequest>'
                        '<tns:status>OK</tns:status>'
                    '</tns:someRequest>'
                '</senv:Body>'
                '</senv:Envelope>'
        ]

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)

        self.assertEquals(ctx.in_header[0], '/SomeAction')
        self.assertEquals(ctx.in_header[1], 'SomeMessageID')
        self.assertEquals(ctx.in_header[2], None)
开发者ID:colons,项目名称:spyne,代码行数:30,代码来源:test_soap.py


示例3: test_soap_input_header_order

    def test_soap_input_header_order(self):
        """
        Tests supports for input headers whose elements are provided in
        different order than that defined in rpc declaration _in_header parameter.
        """
        server = ServerBase(self.app)
        initial_ctx = MethodContext(server)
        initial_ctx.in_string = [
            '<senv:Envelope xmlns:tns="tns"'
                'xmlns:wsa="http://www.w3.org/2005/08/addressing"'
                'xmlns:senv="http://schemas.xmlsoap.org/soap/envelope/">'
                '<senv:Header>'
                    '<wsa:MessageID>SomeMessageID</wsa:MessageID>'
                    '<wsa:RelatesTo>SomeRelatesToID</wsa:RelatesTo>'
                    '<wsa:Action>/SomeAction</wsa:Action>'
                '</senv:Header>'
                '<senv:Body>'
                    '<tns:someRequest>'
                        '<tns:status>OK</tns:status>'
                    '</tns:someRequest>'
                '</senv:Body>'
                '</senv:Envelope>'
        ]

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)

        self.assertEquals(ctx.in_header[0], '/SomeAction')
        self.assertEquals(ctx.in_header[1], 'SomeMessageID')
        self.assertEquals(ctx.in_header[2], 'SomeRelatesToID')
开发者ID:colons,项目名称:spyne,代码行数:30,代码来源:test_soap.py


示例4: test_mandatory_elements

    def test_mandatory_elements(self):
        class SomeService(ServiceBase):
            @srpc(M(Unicode), _returns=Unicode)
            def some_call(s):
                assert s == 'hello'
                return s

        app = Application([SomeService], "tns", name="test_mandatory_elements",
                          in_protocol=XmlDocument(validator='lxml'),
                          out_protocol=XmlDocument())
        server = ServerBase(app)

        # Valid call with all mandatory elements in
        ctx = self._get_ctx(server, [
            '<some_call xmlns="tns">'
                '<s>hello</s>'
            '</some_call>'
        ])
        server.get_out_object(ctx)
        server.get_out_string(ctx)
        ret = etree.fromstring(''.join(ctx.out_string)).xpath(
            '//tns:some_call%s/text()' % RESULT_SUFFIX,
            namespaces=app.interface.nsmap)[0]
        assert ret == 'hello'


        # Invalid call
        ctx = self._get_ctx(server, [
            '<some_call xmlns="tns">'
                # no mandatory elements here...
            '</some_call>'
        ])
        self.assertRaises(SchemaValidationError, server.get_out_object, ctx)
开发者ID:DmitriyMoseev,项目名称:spyne,代码行数:33,代码来源:test_xml.py


示例5: test_basic

    def test_basic(self):
        class SomeService(ServiceBase):
            @srpc(String(pattern='a'))
            def some_method(s):
                pass

        application = Application([SomeService],
            in_protocol=Soap11(validator='soft'),
            out_protocol=Soap11(),
            name='Service', tns='tns',
        )
        server = ServerBase(application)

        ctx = MethodContext(server)
        ctx.in_string = [u"""
            <SOAP-ENV:Envelope xmlns:ns0="tns"
                               xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
               <SOAP-ENV:Body>
                  <ns0:some_method>
                     <ns0:s>OK</ns0:s>
                  </ns0:some_method>
               </SOAP-ENV:Body>
            </SOAP-ENV:Envelope>
        """]


        ctx, = server.generate_contexts(ctx)
        server.get_in_object(ctx)

        self.assertEquals(isinstance(ctx.in_error, ValidationError), True)
开发者ID:listestalo,项目名称:spyne,代码行数:30,代码来源:test_soft_validation.py


示例6: __init__

    def __init__(self, app, app_url, wsdl_url=None):
        ServerBase.__init__(self, app)

        self.app_url = app_url
        self.wsdl_url = wsdl_url

        self.zmq_socket = context.socket(zmq.REP)
        self.zmq_socket.bind(app_url)
开发者ID:66ru,项目名称:spyne,代码行数:8,代码来源:zeromq.py


示例7: __init__

    def __init__(self, app, chunked=False,
                max_content_length=2 * 1024 * 1024,
                block_length=8 * 1024):
        ServerBase.__init__(self, app)

        self.chunked = chunked
        self.max_content_length = max_content_length
        self.block_length = block_length
开发者ID:KetothXupack,项目名称:spyne,代码行数:8,代码来源:http.py


示例8: __init__

    def __init__(self, db, app, consumer_id):
        ServerBase.__init__(self, app)

        self.session = sessionmaker(bind=db)()
        self.id = consumer_id

        if self.session.query(WorkerStatus).get(self.id) is None:
            self.session.add(WorkerStatus(worker_id=self.id))
            self.session.commit()
开发者ID:Bernie,项目名称:spyne,代码行数:9,代码来源:queue.py


示例9: __init__

    def __init__(self, app, chunked=False,
                max_content_length=2 * 1024 * 1024,
                block_length=8 * 1024):
        ServerBase.__init__(self, app)

        self._allowed_http_verbs = app.in_protocol.allowed_http_verbs

        self.chunked = chunked
        self.max_content_length = max_content_length
        self.block_length = block_length
开发者ID:Bluehorn,项目名称:spyne,代码行数:10,代码来源:http.py


示例10: __init__

    def __init__(self, db, app, consumer_id):
        ServerBase.__init__(self, app)

        self.session = sessionmaker(bind=db)()
        self.id = consumer_id

        worker_status = self.session.query(WorkerStatus).filter_by(worker_id=self.id).first()

        if worker_status is None:
            self.session.add(WorkerStatus(worker_id=self.id, task_id=0))
            self.session.commit()
开发者ID:norox,项目名称:spyne,代码行数:11,代码来源:queue.py


示例11: __init__

    def __init__(self, db, app, consumer_id):
        ServerBase.__init__(self, app)

        self.session = sessionmaker(bind=db)()
        self.id = consumer_id

        try:
            self.session.query(WorkerStatus) \
                          .filter_by(worker_id=self.id).one()
        except NoResultFound:
            self.session.add(WorkerStatus(worker_id=self.id, task_id=0))
            self.session.commit()
开发者ID:66ru,项目名称:spyne,代码行数:12,代码来源:queue.py


示例12: deserialize_request_string

def deserialize_request_string(string, app):
    """Deserialize request string using in_protocol in application definition.
    Returns the corresponding native python object.
    """

    server = ServerBase(app)
    initial_ctx = MethodContext(server)
    initial_ctx.in_string = [string]

    ctx = server.generate_contexts(initial_ctx)[0]
    server.get_in_object(ctx)
    return ctx.in_object
开发者ID:Bluehorn,项目名称:spyne,代码行数:12,代码来源:protocol.py


示例13: test_invalid_input

    def test_invalid_input(self):
        class SomeService(ServiceBase):
            @srpc()
            def yay():
                pass

        app = Application([SomeService], 'tns', JsonObject(), JsonObject(), Wsdl11())
        server = ServerBase(app)

        initial_ctx = MethodContext(server)
        initial_ctx.in_string = ['{"some_call": {"yay": ]}}']
        ctx, = server.generate_contexts(initial_ctx)
        assert ctx.in_error.faultcode == 'Client.JsonDecodeError'
开发者ID:leekchan,项目名称:spyne,代码行数:13,代码来源:test_json.py


示例14: test_primitive_only

        def test_primitive_only(self):
            class SomeComplexModel(ComplexModel):
                i = Integer
                s = String

            class SomeService(ServiceBase):
                @srpc(SomeComplexModel, _returns=SomeComplexModel)
                def some_call(scm):
                    return SomeComplexModel(i=5, s='5x')

            app = Application([SomeService], 'tns',
                                    in_protocol=_DictObjectChild(),
                                    out_protocol=_DictObjectChild())

            server = ServerBase(app)
            initial_ctx = MethodContext(server)
            initial_ctx.in_string = [serializer.dumps({"some_call":[]})]

            ctx, = server.generate_contexts(initial_ctx)
            server.get_in_object(ctx)
            server.get_out_object(ctx)
            server.get_out_string(ctx)

            assert list(ctx.out_string) == [serializer.dumps(
                {"some_callResponse": {"some_callResult": {"i": 5, "s": "5x"}}})]
开发者ID:anthonyrisinger,项目名称:spyne,代码行数:25,代码来源:_test_dictobj.py


示例15: test_invalid_input

    def test_invalid_input(self):
        class SomeService(ServiceBase):
            pass

        app = Application([SomeService], 'tns',
                                in_protocol=JsonDocument(),
                                out_protocol=JsonDocument())

        server = ServerBase(app)

        initial_ctx = MethodContext(server, MethodContext.SERVER)
        initial_ctx.in_string = [b'{']
        ctx, = server.generate_contexts(initial_ctx, in_string_charset='utf8')
        assert ctx.in_error.faultcode == 'Client.JsonDecodeError'
开发者ID:ashleysommer,项目名称:spyne,代码行数:14,代码来源:test_json.py


示例16: test_invalid_input

    def test_invalid_input(self):
        class SomeService(ServiceBase):
            @srpc()
            def yay():
                pass

        app = Application([SomeService], "tns", in_protocol=JsonDocument(), out_protocol=JsonDocument())

        server = ServerBase(app)

        initial_ctx = MethodContext(server)
        initial_ctx.in_string = ["{"]
        ctx, = server.generate_contexts(initial_ctx)
        assert ctx.in_error.faultcode == "Client.JsonDecodeError"
开发者ID:rstuart,项目名称:spyne,代码行数:14,代码来源:test_json.py


示例17: test_decimal

    def test_decimal(self):
        d = decimal.Decimal('1e100')

        class SomeService(ServiceBase):
            @srpc(Decimal(120,4), _returns=Decimal)
            def some_call(p):
                print(p)
                print(type(p))
                assert type(p) == decimal.Decimal
                assert d == p
                return p

        app = Application([SomeService], "tns", in_protocol=XmlDocument(),
                                                out_protocol=XmlDocument())
        server = ServerBase(app)
        initial_ctx = MethodContext(server)
        initial_ctx.in_string = ['<some_call xmlns="tns"><p>%s</p></some_call>'
                                                                            % d]

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)
        server.get_out_object(ctx)
        server.get_out_string(ctx)

        elt = etree.fromstring(''.join(ctx.out_string))

        print(etree.tostring(elt, pretty_print=True))
        target = elt.xpath('//tns:some_callResult/text()',
                                              namespaces=app.interface.nsmap)[0]
        assert target == str(d)
开发者ID:DmitriyMoseev,项目名称:spyne,代码行数:30,代码来源:test_xml.py


示例18: test_attribute_ns

    def test_attribute_ns(self):
        class a(ComplexModel):
            b = Unicode
            c = XmlAttribute(Unicode, ns="spam", attribute_of="b")

        class SomeService(ServiceBase):
            @srpc(_returns=a)
            def some_call():
                return a(b="foo",c="bar")

        app = Application([SomeService], "tns", in_protocol=XmlDocument(),
                                                out_protocol=XmlDocument())
        server = ServerBase(app)
        initial_ctx = MethodContext(server)
        initial_ctx.in_string = ['<some_call xmlns="tns"/>']

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)
        server.get_out_object(ctx)
        server.get_out_string(ctx)

        elt = etree.fromstring(''.join(ctx.out_string))
        target = elt.xpath('//s0:b', namespaces=app.interface.nsmap)[0]

        print(etree.tostring(elt, pretty_print=True))
        assert target.attrib['{%s}c' % app.interface.nsmap["s1"]] == "bar"
开发者ID:DmitriyMoseev,项目名称:spyne,代码行数:26,代码来源:test_xml.py


示例19: test_xml_data

    def test_xml_data(self):
        class C(ComplexModel):
            a = XmlData(Unicode)
            b = XmlAttribute(Unicode)

        class SomeService(ServiceBase):
            @srpc(C, _returns=C)
            def some_call(c):
                assert c.a == "a"
                assert c.b == "b"
                return c

        app = Application(
            [SomeService], "tns", name="test_xml_data", in_protocol=XmlDocument(), out_protocol=XmlDocument()
        )
        server = ServerBase(app)
        initial_ctx = MethodContext(server, MethodContext.SERVER)
        initial_ctx.in_string = [b'<some_call xmlns="tns">' b'<c b="b">a</c>' b"</some_call>"]

        ctx, = server.generate_contexts(initial_ctx)
        server.get_in_object(ctx)
        server.get_out_object(ctx)
        server.get_out_string(ctx)

        print(ctx.out_string)
        pprint(app.interface.nsmap)

        ret = etree.fromstring(b"".join(ctx.out_string)).xpath(
            "//tns:some_call" + RESULT_SUFFIX, namespaces=app.interface.nsmap
        )[0]

        print(etree.tostring(ret, pretty_print=True))

        assert ret.text == "a"
        assert ret.attrib["b"] == "b"
开发者ID:plq,项目名称:spyne,代码行数:35,代码来源:test_xml.py


示例20: _dry_me

    def _dry_me(services, d, ignore_wrappers=False, complex_as=dict,
                         just_ctx=False, just_in_object=False, validator=None,
                         polymorphic=False):

        app = Application(services, 'tns',
                in_protocol=_DictDocumentChild(
                    ignore_wrappers=ignore_wrappers, complex_as=complex_as,
                    polymorphic=polymorphic, validator=validator,
                ),
                out_protocol=_DictDocumentChild(
                     ignore_wrappers=ignore_wrappers, complex_as=complex_as,
                                                       polymorphic=polymorphic),
            )

        server = ServerBase(app)
        initial_ctx = MethodContext(server, MethodContext.SERVER)
        in_string = serializer.dumps(d, **dumps_kwargs)
        if not isinstance(in_string, bytes):
            in_string = in_string.encode('utf8')
        initial_ctx.in_string = [in_string]

        ctx, = server.generate_contexts(initial_ctx, in_string_charset='utf8')
        if not just_ctx:
            server.get_in_object(ctx)
            if not just_in_object:
                server.get_out_object(ctx)
                server.get_out_string(ctx)

        return ctx
开发者ID:1-bit,项目名称:spyne,代码行数:29,代码来源:_test_dictdoc.py



注:本文中的spyne.server.ServerBase类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python wsgi._parse_qs函数代码示例发布时间:2022-05-27
下一篇:
Python xml.XmlDocument类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap