• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python client.SoapClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pysimplesoap.client.SoapClient的典型用法代码示例。如果您正苦于以下问题:Python SoapClient类的具体用法?Python SoapClient怎么用?Python SoapClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SoapClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: send_msg_to_bpel

    def send_msg_to_bpel(self, callback_method_id, msg):
        """
        To send a SOAP message to BPEL

        :param address:
        :param namespace:
        :param method:
        :param params:
        :param result_names:
        :return:
        """
        rospy.loginfo("===== Prepare a SOAP client =====")
        rospy.loginfo("Address: %s" % self.callback_bpel_method_map[callback_method_id]['address'])
        rospy.loginfo("NS: %s" % self.callback_bpel_method_map[callback_method_id]['namespace'])
        rospy.loginfo("=================================")
        callback_method = self.callback_bpel_method_map[callback_method_id]

        client = SoapClient(
            location = callback_method['address'], # "http://localhost:8080/ode/processes/A2B"
            namespace = callback_method['namespace'], # "http://smartylab.co.kr/bpel/a2b"
            soap_ns='soap')
        rospy.loginfo("Sending the message to BPEL... %s(%s)" % (callback_method['name'], msg))
        # response = client.call(method, **(simplejson.loads(params)))
        param_dict = dict()
        param_dict[callback_method['param']] = msg
        response = client.call(callback_method['name'], **param_dict)
        rospy.loginfo("The message is sent.")
开发者ID:smartylab,项目名称:concert_adapter,代码行数:27,代码来源:adapter.py


示例2: get_hostname

def get_hostname(ip):
	#enable 5 second timeout
	socket.setdefaulttimeout(5)
	#query agent by SOAP
	client=SoapClient(location = "http://"+ip+":8008/",action = "http://"+ip+":8008/", namespace = "http://example.com/sample.wsdl",soap_ns='soap',ns = False)
	rs=str(client.get_value(number=11,com=xml['network']['commands']['hostname']['@com'],args=xml['network']['commands']['hostname']['#text']).resultaat)
	return rs.rstrip()
开发者ID:The-Sec,项目名称:TCSB-V2SYS6-10,代码行数:7,代码来源:index.py


示例3: test_buscar_personas_raw

    def test_buscar_personas_raw(self):

        url = "http://www.testgobi.dpi.sfnet/licencias/web/soap.php"
        client = SoapClient(location=url, ns="web",
                            namespace="http://wwwdesagobi.dpi.sfnet:8080/licencias/web/",
                            action=url)
        # load dummy response (for testing)
        client.http = DummyHTTP(self.xml)
        client['AuthHeaderElement'] = {'username': 'mariano', 'password': 'clave'}
        response = client.PersonaSearch(persona=(('numero_documento', '99999999'),
                                                 ('apellido_paterno', ''),
                                                 ('apellido_materno', ''),
                                                 ('nombres', ''),
                                                 ))

        # the raw response is a SimpleXmlElement object:

        self.assertEqual(str(response.result.item[0]("xsd:string")[0]), "resultado")
        self.assertEqual(str(response.result.item[0]("xsd:string")[1]), "true")
        self.assertEqual(str(response.result.item[1]("xsd:string")[0]), "codigo")
        self.assertEqual(str(response.result.item[1]("xsd:string")[1]), "WS01-01")
        self.assertEqual(str(response.result.item[2]("xsd:string")[0]), "mensaje")
        self.assertEqual(str(response.result.item[2]("xsd:string")[1]), "Se encontraron 1 personas.")
        self.assertEqual(str(response.result.item[2]("xsd:string")[0]), "mensaje")
        self.assertEqual(str(response.result.item[2]("xsd:string")[1]), "Se encontraron 1 personas.")

        self.assertEqual(str(response.result.item[3]("xsd:anyType")[0]), "datos")
        self.assertEqual(str(response.result.item[3]("xsd:anyType")[1]("ns2:Map").item[0].key), "lic_ps_ext_id")
        self.assertEqual(str(response.result.item[3]("xsd:anyType")[1]("ns2:Map").item[0].value), "123456")
        self.assertEqual(str(response.result.item[3]("xsd:anyType")[1]("ns2:Map").item[10].key), "fecha_nacimiento")
        self.assertEqual(str(response.result.item[3]("xsd:anyType")[1]("ns2:Map").item[10].value), "1985-10-02 00:00:00")
开发者ID:AmineCherrai,项目名称:pysimplesoap,代码行数:31,代码来源:licencias_test.py


示例4: __init__

    def __init__(self, studyLocation, proxyStr, proxyUsr, proxyPass, isTrace):
        """Constructor
        """
        self._logger = logging.getLogger(__name__)
        logging.config.fileConfig("logging.ini", disable_existing_loggers=False)

        proxies = None

        if proxyStr:
            proxies = pysimplesoap.client.parse_proxy(proxyStr)
            self._logger.info("OC Event SOAP services with proxies: " + str(proxies))

        self._logger.info("OC Event SOAP services with auth: " + str(proxyUsr))

        if proxies:
            self.client = SoapClient(location=studyLocation,
                namespace=STUDYNAMESPACE,
                action=STUDYACTION,
                soap_ns='soapenv',
                ns="v1",
                trace=isTrace,
                proxy=proxies,
                username=proxyUsr,
                password=proxyPass)
        else:
            self.client = SoapClient(location=studyLocation,
                namespace=STUDYNAMESPACE,
                action=STUDYACTION,
                soap_ns='soapenv',
                ns="v1",
                trace=isTrace,
                username=proxyUsr,
                password=proxyPass)
开发者ID:andrewmcloughlin,项目名称:open-clinica-scripts,代码行数:33,代码来源:OCEventWsService.py


示例5: test_validar

 def test_validar(self):
     "Prueba de envío de un comprobante electrónico"
     WSDL = 'https://celcer.sri.gob.ec/comprobantes-electronicos-ws/RecepcionComprobantes?wsdl'
     # https://cel.sri.gob.ec/comprobantes-electronicos-ws/RecepcionComprobantes?wsdl
     client = SoapClient(wsdl=WSDL, ns="ec")
     ret = client.validarComprobante(xml='cid:1218403525359')
     print '>>> ', ret
开发者ID:cholitoecuatoriano,项目名称:ODOO-Ecuador-v8,代码行数:7,代码来源:wstest.py


示例6: test_issue46

    def test_issue46(self):
        """Example for sending an arbitrary header using SimpleXMLElement"""

        # fake connection (just to test xml_request):
        client = SoapClient(
            location="https://localhost:666/",
            namespace='http://localhost/api'
        )

        # Using WSDL, the equivalent is:
        # client['MyTestHeader'] = {'username': 'test', 'password': 'test'}

        headers = SimpleXMLElement("<Headers/>")
        my_test_header = headers.add_child("MyTestHeader")
        my_test_header['xmlns'] = "service"
        my_test_header.marshall('username', 'test')
        my_test_header.marshall('password', 'password')

        try:
            client.methodname(headers=headers)
        except:
            open("issue46.xml", "wb").write(client.xml_request)
            self.assert_('<soap:Header><MyTestHeader xmlns="service">'
                            '<username>test</username>'
                            '<password>password</password>'
                         '</MyTestHeader></soap:Header>' in client.xml_request.decode(),
                         "header not in request!")
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:27,代码来源:issues_test.py


示例7: test_issue127

 def test_issue127(self):
     """Test relative schema locations in imports"""
     client = SoapClient(wsdl = 'https://eisoukr.musala.com:9443/IrmInboundMediationWeb/sca/MTPLPolicyWSExport/WEB-INF/wsdl/wsdl/IrmInboundMediation_MTPLPolicyWSExport.wsdl')
     try:
         resp = client.voidMTPLPolicy()
     except Exception as e:
         self.assertIn('InvalidSecurity', e.faultcode)
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:7,代码来源:issues_test.py


示例8: _send_ax

def _send_ax():
    client = SoapClient('http://uzak.audio.com.tr:88/AxIntegrations/AxService.asmx', action='http://tempuri.org/', namespace='http://tempuri.org/', ns='tem')
    files = os.listdir(AX_QUEUE_FOLDER) 
    for name in files:
        try:
            path = os.path.join(AX_QUEUE_FOLDER, name)
            with open(path, 'r') as infile:
                data = json.load(infile)
            #data = dict([(k.encode('utf8'), v.encode('utf8')) for k, v in data.items()])
            bilgi = Bilgi.objects.get(pk=int(name))
            if bilgi.ax_code:
                raise Exception('Teklif %s already has an ax_code: %s, no duplicate allowed. %s' % (name, bilgi.ax_code, bilgi.tarih))
            resp = client.call('AddWebQuotationForm', **data)
            ax_code = str(resp.AddWebQuotationFormResult) 
            bilgi.ax_code = ax_code
            bilgi.save()
        except Exception as e:
            if isinstance(e, SoapFault) and e.faultcode == 'soap:Server':
                body = "%s\n\nRequest:\n\n%s\n\n\nResponse:\n\n%s\n" % (unicode(e).encode('utf8'), client.xml_request, client.xml_response)
                subject = 'Web Teklif Axapta Server Hatasi'
                audiomail('[email protected]', ['[email protected] '], subject, body)
                audiomail('[email protected]', ['[email protected]'], subject, body)
            logging.exception('hello')
        else:
            os.unlink(path)
开发者ID:refik,项目名称:audio,代码行数:25,代码来源:send_ax.py


示例9: test_autorizar

 def test_autorizar(self):
         
     WSDL = 'https://cel.sri.gob.ec/comprobantes-electronicos-ws/AutorizacionComprobantes?wsdl'
     # https://cel.sri.gob.ec/comprobantes-electronicos-ws/AutorizacionComprobantes?wsdl
     client = SoapClient(wsdl=WSDL, ns="ec")
     ret = client.autorizacionComprobante(claveAccesoComprobante="1702201205176001321000110010030001000011234567816")
     self.assertEquals(ret, {'RespuestaAutorizacionComprobante': {'autorizaciones': [], 'claveAccesoConsultada': '1702201205176001321000110010030001000011234567816', 'numeroComprobantes': '0'}})
开发者ID:AmineCherrai,项目名称:pysimplesoap,代码行数:7,代码来源:sri_ec_test.py


示例10: add_transaction_to_usaepay

 def add_transaction_to_usaepay(self, token, usaepay_customer_id):
     if settings.TEST_MODE:
         return 123456
     client = SoapClient(wsdl=settings.USAEPAY_WSDL,
                         trace=True,
                         ns=False)
     description = ''
     if self.autorefill and not self.company.blank_usaepay_description:
         description = 'Prepaid Phone recharge for %s' % self.autorefill.phone_number
     params = {
         u'Command': 'Sale',
         u'Details': {
             u'Amount': self.amount,
             u'Description': description,
             u'Invoice': '%s' % self.id,
             # u'OrderID': '',
             # u'PONum': '',
         }
     }
     response = client.runCustomerTransaction(Token=token,
                                              CustNum=usaepay_customer_id,
                                              PaymentMethodID=0,
                                              Parameters=params)
     result_code = response['runCustomerTransactionReturn']['ResultCode']
     if 'A' == result_code:
         result = response['runCustomerTransactionReturn']['RefNum']
         return result
     elif 'D' == result_code:
         self.atransaction = response['runCustomerTransactionReturn']['RefNum']
         self.save()
         raise Exception('Transaction Declined: %s' % response['runCustomerTransactionReturn']['Error'])
     else:
         self.atransaction = response['runCustomerTransactionReturn']['RefNum']
         self.save()
         raise Exception('Transaction Error: %s' % response['runCustomerTransactionReturn']['Error'])
开发者ID:adulenzy,项目名称:Auto-phone-recharge-system,代码行数:35,代码来源:models.py


示例11: test_validar

 def test_validar(self):
     "Prueba de envío de un comprovante electrónico"
     WSDL = 'https://celcer.sri.gob.ec/comprobantes-electronicos-ws/RecepcionComprobantes?wsdl'
     # https://cel.sri.gob.ec/comprobantes-electronicos-ws/RecepcionComprobantes?wsdl
     client = SoapClient(wsdl=WSDL, ns="ec")
     ret = client.validarComprobante(xml="cid:1218403525359")
     self.assertEquals(ret, {'RespuestaRecepcionComprobante': {'comprobantes': [{'comprobante': {'mensajes': [{'mensaje': {'identificador': '35', 'mensaje': 'ARCHIVO NO CUMPLE ESTRUCTURA XML', 'informacionAdicional': 'Content is not allowed in prolog.', 'tipo': 'ERROR'}}], 'claveAcceso': 'N/A'}}], 'estado': 'DEVUELTA'}})
开发者ID:AmineCherrai,项目名称:pysimplesoap,代码行数:7,代码来源:sri_ec_test.py


示例12: call_wsaa

def call_wsaa(cms, wsdl=WSDL, proxy=None, cache=None, wrapper="", trace=False):
    "Call the RPC method with the CMS to get the authorization ticket (TA)"

    # create the webservice client
    client = SoapClient(
        location = wsdl[:-5], #location, use wsdl,
        cache = cache,
        #proxy = parse_proxy(proxy),
        #cacert = cacert,
        timeout = TIMEOUT,
        ns = "ejb", 
        # TODO: find a better method to not include ns prefix in children:
        #   (wsdl parse should detect qualification instead of server dialect)
        soap_server = "jetty",  
        namespace = "http://ejb.server.wsaa.dna.gov.py/",
        soap_ns = "soapenv",
        trace = trace)
    # fix the wrong location (192.4.1.39:8180 in the WDSL)
    ##ws = client.services['WsaaServerBeanService']
    ##location = ws['ports']['WsaaServerBeanPort']['location']
    ##location = location.replace("192.4.1.39:8180", "secure.aduana.gov.py")
    ##ws['ports']['WsaaServerBeanPort']['location'] = wsdl[:-5] #location
    
    # call the remote method
    try:
        results = client.loginCms(arg0=str(cms))
    except:
        # save sent and received messages for debugging:
        open("request.xml", "w").write(client.xml_request)
        open("response.xml", "w").write(client.xml_response)
        raise
    
    # extract the result:
    ta = results['return'].encode("utf-8")
    return ta
开发者ID:AmineCherrai,项目名称:pysimplesoap,代码行数:35,代码来源:wsaa_py.py


示例13: test_issue129

 def test_issue129(self):
     """Test RPC style (axis) messages (including parameter order)"""
     wsdl_url = 'http://62.94.212.138:8081/teca/services/tecaServer?wsdl'
     client = SoapClient(wsdl=wsdl_url, soap_server='axis')
     client.help("contaVolumi")
     response = client.contaVolumi(user_id=1234, valoreIndice=["IDENTIFIER", ""])
     self.assertEqual(response, {'contaVolumiReturn': 0})
开发者ID:adammendoza,项目名称:pysimplesoap,代码行数:7,代码来源:issues_test.py


示例14: test_wsfexv1_getcmp

 def test_wsfexv1_getcmp(self):
     """Test Argentina AFIP Electronic Invoice WSFEXv1 GetCMP method"""
     # create the proxy and parse the WSDL
     client = SoapClient(
         wsdl="https://wswhomo.afip.gov.ar/wsfexv1/service.asmx?WSDL",
         cache=None
     )
     # load saved xml
     xml = open(os.path.join(TEST_DIR, "wsfexv1_getcmp.xml")).read()
     client.http = DummyHTTP(xml)
     # call RPC
     ret = client.FEXGetCMP(
         Auth={'Token': "", 'Sign': "", 'Cuit': "0"},
         Cmp={
             'Cbte_tipo': "19",
             'Punto_vta': "3",
             'Cbte_nro': "38",
         })
     # analyze result
     result = ret['FEXGetCMPResult']
     self.assertEqual(result['FEXErr']['ErrCode'], 0)
     self.assertEqual(result['FEXErr']['ErrMsg'], 'OK')
     self.assertEqual(result['FEXEvents']['EventCode'], 0)
     resultget = result['FEXResultGet']
     self.assertEqual(resultget['Obs'], None)
     self.assertEqual(resultget['Cae'], '61473001385110')
     self.assertEqual(resultget['Fch_venc_Cae'], '20111202')
     self.assertEqual(resultget['Fecha_cbte'], '20111122')
     self.assertEqual(resultget['Punto_vta'], 3)
     self.assertEqual(resultget['Resultado'], "A")
     self.assertEqual(resultget['Cbte_nro'], 38)
     self.assertEqual(resultget['Imp_total'], Decimal('130.21'))
     self.assertEqual(resultget['Cbte_tipo'], 19)
开发者ID:adammendoza,项目名称:pysimplesoap,代码行数:33,代码来源:afip_test.py


示例15: atest_issue80

    def atest_issue80(self):
        """Test Issue in sending a webservice request with soap12"""    
        client = SoapClient(wsdl="http://testserver:7007/testapp/services/testService?wsdl",
                            soap_ns='soap12', trace=False, soap_server='oracle')        
        try:
            result = client.hasRole(userId='test123', role='testview')
        except httplib2.ServerNotFoundError:
	        pass
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:8,代码来源:issues_test.py


示例16: atest_wsaa_exception

 def atest_wsaa_exception(self):
     "Test WSAA for SoapFault"
     WSDL = "https://wsaa.afip.gov.ar/ws/services/LoginCms?wsdl"
     client = SoapClient(wsdl=WSDL, ns="web")
     try:
         resultado = client.loginCms('31867063')
     except SoapFault, e:
         self.assertEqual(e.faultcode, 'ns1:cms.bad')
开发者ID:rcarmo,项目名称:pysimplesoap,代码行数:8,代码来源:afip_tests.py


示例17: test_issue49

 def test_issue49(self):
     """Test netsuite wsdl"""    
     client = SoapClient(wsdl="https://webservices.netsuite.com/wsdl/v2011_2_0/netsuite.wsdl")        
     try:
         response = client.login(passport=dict(email="[email protected]", password="secret", account='hello', role={'name': 'joe'}))
     except Exception as e:
         # It returns "This document you requested has moved temporarily."
         pass
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:8,代码来源:issues_test.py


示例18: test_issue57

 def test_issue57(self):
     """Test SalesForce wsdl"""
     # open the attached sfdc_enterprise_v20.wsdl to the issue in googlecode 
     client = SoapClient(wsdl="https://pysimplesoap.googlecode.com/issues/attachment?aid=570000001&name=sfdc_enterprise_v20.wsdl&token=bD6VTXMx8p4GJQHGhlQI1ISorSA%3A1399085346613")        
     try:
         response = client.login(username="john", password="doe")
     except Exception as e:
         # It returns "This document you requested has moved temporarily."
         self.assertEqual(e.faultcode, 'INVALID_LOGIN')
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:9,代码来源:issues_test.py


示例19: test_issue139

    def test_issue139(self):
        """Test MKS wsdl (extension)"""
        # open the attached Integrity_10_2Service to the issue in googlecode 
        client = SoapClient(wsdl="https://pysimplesoap.googlecode.com/issues/attachment?aid=1390000000&name=Integrity_10_2.wsdl&token=3VG47As2K-EupP9GgotYckgb0Bc%3A1399064656814")
        #print client.help("getItemsByCustomQuery")
        try:
            response = client.getItemsByCustomQuery(arg0={'Username': 'user', 'Password' : 'pass', 'InputField' : 'ID', 'QueryDefinition' : 'query'})
        except httplib2.ServerNotFoundError:
	        pass
开发者ID:smartylab,项目名称:pysimplesoap,代码行数:9,代码来源:issues_test.py


示例20: test_issue57

 def test_issue57(self):
     """Test SalesForce wsdl"""
     # open the attached sfdc_enterprise_v20.wsdl to the issue in googlecode 
     client = SoapClient(wsdl="https://pysimplesoap.googlecode.com/issues/attachment?aid=570000001&name=sfdc_enterprise_v20.wsdl&token=dnnjZu-x1aF5gV0bYfM6K6hpAIM%3A1390359396946")        
     try:
         response = client.login(username="john", password="doe")
     except Exception as e:
         # It returns "This document you requested has moved temporarily."
         self.assertEqual(e.faultcode, u'INVALID_LOGIN')
开发者ID:adammendoza,项目名称:pysimplesoap,代码行数:9,代码来源:issues_test.py



注:本文中的pysimplesoap.client.SoapClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python server.SoapDispatcher类代码示例发布时间:2022-05-26
下一篇:
Python client.SimpleXMLElement类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap