本文整理汇总了Python中smpp.pdu_builder.DeliverSM类的典型用法代码示例。如果您正苦于以下问题:Python DeliverSM类的具体用法?Python DeliverSM怎么用?Python DeliverSM使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DeliverSM类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_submit_and_deliver_ussd_new_custom_ussd_code_field
def test_submit_and_deliver_ussd_new_custom_ussd_code_field(self):
session = SessionInfo()
yield self.get_transport(deliver_config={
'ussd_code_pdu_field': 'destination_addr',
})
# Server delivers a USSD message to the Client
pdu = DeliverSM(1, short_message="*IGNORE#", destination_addr="*123#")
pdu.add_optional_parameter('ussd_service_op', '01')
pdu.add_optional_parameter('its_session_info', session.its_info)
yield self.fake_smsc.handle_pdu(pdu)
[mess] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(mess['content'], None)
self.assertEqual(mess['to_addr'], '*123#')
self.assertEqual(mess['transport_type'], "ussd")
self.assertEqual(mess['session_event'],
TransportUserMessage.SESSION_NEW)
self.assertEqual(
mess['transport_metadata'],
{
'session_info': {
'session_identifier': session.sixdee_id,
'ussd_service_op': '01',
}
})
开发者ID:praekelt,项目名称:vumi,代码行数:28,代码来源:test_sixdee.py
示例2: test_submit_and_deliver_ussd_new
def test_submit_and_deliver_ussd_new(self):
session_identifier = 12345
smpp_helper = yield self.get_smpp_helper()
# Server delivers a USSD message to the Client
pdu = DeliverSM(1, short_message="*123#")
pdu.add_optional_parameter('ussd_service_op', '01')
pdu.add_optional_parameter('user_message_reference',
session_identifier)
yield smpp_helper.handle_pdu(pdu)
[mess] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(mess['content'], None)
self.assertEqual(mess['to_addr'], '*123#')
self.assertEqual(mess['transport_type'], "ussd")
self.assertEqual(mess['session_event'],
TransportUserMessage.SESSION_NEW)
self.assertEqual(
mess['transport_metadata'],
{
'session_info': {
'session_identifier': 12345
}
})
开发者ID:areski,项目名称:vumi,代码行数:26,代码来源:test_mica.py
示例3: test_deliver_sm_message_payload
def test_deliver_sm_message_payload(self):
"""A message in the `message_payload` field should be delivered."""
esme = yield self.get_esme(
deliver_sm=self.assertion_cb(u'hello', 'short_message'))
sm = DeliverSM(1, short_message='')
sm.add_message_payload(''.join('%02x' % ord(c) for c in 'hello'))
yield esme.handle_deliver_sm(unpack_pdu(sm.get_bin()))
开发者ID:AndrewCvekl,项目名称:vumi,代码行数:7,代码来源:test_client.py
示例4: test_submit_and_deliver_ussd_continue
def test_submit_and_deliver_ussd_continue(self):
# Startup
yield self.startTransport()
yield self.transport._block_till_bind
yield self.clear_link_pdus()
# Next the Client submits a USSD message to the Server
# and recieves an ack
msg = yield self.tx_helper.make_dispatch_outbound(
"hello world", transport_type="ussd")
# First we make sure the Client binds to the Server
# and enquire_link pdu's are exchanged as expected
pdu_queue = self.service.factory.smsc.pdu_queue
submit_sm_pdu = yield pdu_queue.get()
self.assert_server_pdu(
mk_expected_pdu('inbound', 3, 'submit_sm'), submit_sm_pdu)
pdu_opts = unpacked_pdu_opts(submit_sm_pdu['pdu'])
self.assertEqual('02', pdu_opts['ussd_service_op'])
self.assertEqual('0000', pdu_opts['its_session_info'])
# We need the user_message_id to check the ack
user_message_id = msg.payload["message_id"]
[ack, delv] = yield self.tx_helper.wait_for_dispatched_events(2)
self.assertEqual(ack['message_type'], 'event')
self.assertEqual(ack['event_type'], 'ack')
self.assertEqual(ack['transport_name'], self.tx_helper.transport_name)
self.assertEqual(ack['user_message_id'], user_message_id)
self.assertEqual(delv['message_type'], 'event')
self.assertEqual(delv['event_type'], 'delivery_report')
self.assertEqual(delv['transport_name'], self.tx_helper.transport_name)
self.assertEqual(delv['user_message_id'], user_message_id)
self.assertEqual(delv['delivery_status'],
self.expected_delivery_status)
# Finally the Server delivers a USSD message to the Client
pdu = DeliverSM(555,
short_message="reply!",
destination_addr="2772222222",
source_addr="2772000000")
pdu._PDU__add_optional_parameter('ussd_service_op', '02')
pdu._PDU__add_optional_parameter('its_session_info', '0000')
self.service.factory.smsc.send_pdu(pdu)
[mess] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(mess['message_type'], 'user_message')
self.assertEqual(mess['transport_name'], self.tx_helper.transport_name)
self.assertEqual(mess['content'], "reply!")
self.assertEqual(mess['transport_type'], "ussd")
self.assertEqual(mess['session_event'],
TransportUserMessage.SESSION_RESUME)
self.assertEqual([], self.tx_helper.get_dispatched_failures())
开发者ID:AndrewCvekl,项目名称:vumi,代码行数:60,代码来源:test_smpp.py
示例5: test_submit_and_deliver_ussd_close
def test_submit_and_deliver_ussd_close(self):
smpp_helper = yield self.get_smpp_helper()
yield self.tx_helper.make_dispatch_outbound(
"hello world", transport_type="ussd",
session_event=TransportUserMessage.SESSION_CLOSE)
[submit_sm_pdu] = yield smpp_helper.wait_for_pdus(1)
self.assertEqual(command_id(submit_sm_pdu), 'submit_sm')
self.assertEqual(pdu_tlv(submit_sm_pdu, 'ussd_service_op'), '02')
self.assertEqual(pdu_tlv(submit_sm_pdu, 'its_session_info'), '0001')
# Server delivers a USSD message to the Client
pdu = DeliverSM(seq_no(submit_sm_pdu) + 1, short_message="reply!")
pdu.add_optional_parameter('ussd_service_op', '02')
pdu.add_optional_parameter('its_session_info', '0001')
yield smpp_helper.handle_pdu(pdu)
[mess] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(mess['content'], "reply!")
self.assertEqual(mess['transport_type'], "ussd")
self.assertEqual(mess['session_event'],
TransportUserMessage.SESSION_CLOSE)
开发者ID:komuW,项目名称:vumi,代码行数:25,代码来源:test_smpp_transport.py
示例6: test_deliver_sm_op_codes_new
def test_deliver_sm_op_codes_new(self):
session_identifier = 12345
yield self.get_transport()
pdu = DeliverSM(1, short_message="*123#")
pdu.add_optional_parameter("ussd_service_op", "01")
pdu.add_optional_parameter("user_message_reference", session_identifier)
yield self.fake_smsc.handle_pdu(pdu)
[start] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(start["session_event"], TransportUserMessage.SESSION_NEW)
开发者ID:caiobertacco,项目名称:vumi,代码行数:9,代码来源:test_mica.py
示例7: test_deliver_sm_fail
def test_deliver_sm_fail(self):
transport, protocol = yield self.setup_bind()
pdu = DeliverSM(
sequence_number=0, message_id='foo', data_coding=4,
short_message='string with unknown data coding')
protocol.dataReceived(pdu.get_bin())
[deliver_sm_resp] = yield wait_for_pdus(transport, 1)
self.assertCommand(
deliver_sm_resp, 'deliver_sm_resp', sequence_number=0,
status='ESME_RDELIVERYFAILURE')
开发者ID:Nagato23,项目名称:vumi,代码行数:10,代码来源:test_protocol.py
示例8: test_deliver_sm_op_codes_new
def test_deliver_sm_op_codes_new(self):
session = SessionInfo()
yield self.get_transport()
pdu = DeliverSM(1, short_message="*123#")
pdu.add_optional_parameter('ussd_service_op', '01')
pdu.add_optional_parameter('its_session_info', session.its_info)
yield self.fake_smsc.handle_pdu(pdu)
[start] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(start['session_event'],
TransportUserMessage.SESSION_NEW)
开发者ID:praekelt,项目名称:vumi,代码行数:10,代码来源:test_sixdee.py
示例9: test_deliver_sm
def test_deliver_sm(self):
calls = []
self.patch(EsmeTransceiver, 'handle_deliver_sm',
lambda p, pdu: succeed(calls.append(pdu)))
transport, protocol = yield self.setup_bind()
pdu = DeliverSM(
sequence_number=0, message_id='foo', short_message='bar')
protocol.dataReceived(pdu.get_bin())
[deliver_sm] = calls
self.assertCommand(deliver_sm, 'deliver_sm', sequence_number=0)
开发者ID:areski,项目名称:vumi,代码行数:10,代码来源:test_protocol.py
示例10: test_delivery_report_for_unknown_message
def test_delivery_report_for_unknown_message(self):
dr = ("id:123 sub:... dlvrd:... submit date:200101010030"
" done date:200101020030 stat:DELIVRD err:... text:Meep")
deliver = DeliverSM(1, short_message=dr)
with LogCatcher(message="Failed to retrieve message id") as lc:
yield self.esme.handle_data(deliver.get_bin())
[warning] = lc.logs
self.assertEqual(warning['message'],
("Failed to retrieve message id for delivery "
"report. Delivery report from %s "
"discarded." % self.tx_helper.transport_name,))
开发者ID:AndrewCvekl,项目名称:vumi,代码行数:11,代码来源:test_smpp.py
示例11: test_deliver_sm_op_codes_new
def test_deliver_sm_op_codes_new(self):
session_identifier = 12345
smpp_helper = yield self.get_smpp_helper()
pdu = DeliverSM(1, short_message="*123#")
pdu.add_optional_parameter('ussd_service_op', '01')
pdu.add_optional_parameter('user_message_reference',
session_identifier)
yield smpp_helper.handle_pdu(pdu)
[start] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(start['session_event'],
TransportUserMessage.SESSION_NEW)
开发者ID:Nagato23,项目名称:vumi,代码行数:11,代码来源:test_mica.py
示例12: test_deliver_sm_fail
def test_deliver_sm_fail(self):
self.patch(DeliverShortMessageProcessor, 'decode_pdus',
lambda *a: [str('not a unicode string')])
transport, protocol = yield self.setup_bind()
pdu = DeliverSM(
sequence_number=0, message_id='foo', short_message='bar')
protocol.dataReceived(pdu.get_bin())
[deliver_sm_resp] = yield wait_for_pdus(transport, 1)
self.assertCommand(
deliver_sm_resp, 'deliver_sm_resp', sequence_number=0,
status='ESME_RDELIVERYFAILURE')
开发者ID:areski,项目名称:vumi,代码行数:11,代码来源:test_protocol.py
示例13: test_deliver_sm_fail_with_custom_error
def test_deliver_sm_fail_with_custom_error(self):
transport, protocol = yield self.setup_bind(config={
"deliver_sm_decoding_error": "ESME_RSYSERR"
})
pdu = DeliverSM(
sequence_number=0, message_id='foo', data_coding=4,
short_message='string with unknown data coding')
protocol.dataReceived(pdu.get_bin())
[deliver_sm_resp] = yield wait_for_pdus(transport, 1)
self.assertCommand(
deliver_sm_resp, 'deliver_sm_resp', sequence_number=0,
status='ESME_RSYSERR')
开发者ID:Nagato23,项目名称:vumi,代码行数:12,代码来源:test_protocol.py
示例14: test_mo_sms_multipart_long
def test_mo_sms_multipart_long(self):
smpp_helper = yield self.get_smpp_helper()
content = '1' * 255
pdu = DeliverSM(sequence_number=1)
pdu.add_optional_parameter('message_payload', content.encode('hex'))
smpp_helper.send_pdu(pdu)
[deliver_sm_resp] = yield smpp_helper.wait_for_pdus(1)
self.assertEqual(1, seq_no(deliver_sm_resp))
self.assertTrue(pdu_ok(deliver_sm_resp))
[msg] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(msg['content'], content)
开发者ID:komuW,项目名称:vumi,代码行数:13,代码来源:test_smpp_transport.py
示例15: test_deliver_sm_delivery_report_regex_fallback_ucs2_long
def test_deliver_sm_delivery_report_regex_fallback_ucs2_long(self):
esme = yield self.get_esme(delivery_report=self.assertion_cb({
'message_id': '1b1720be-5f48-41c4-b3f8-6e59dbf45366',
'message_state': 'DELIVRD',
}))
dr_text = (
u'id:1b1720be-5f48-41c4-b3f8-6e59dbf45366 sub:001 dlvrd:001 '
u'submit date:120726132548 done date:120726132548 stat:DELIVRD '
u'err:000 text:').encode('utf-16be')
sm = DeliverSM(1, short_message='', data_coding=8)
sm.add_message_payload(dr_text.encode('hex'))
yield esme.handle_deliver_sm(unpack_pdu(sm.get_bin()))
开发者ID:AndrewCvekl,项目名称:vumi,代码行数:13,代码来源:test_client.py
示例16: test_deliver_sm_fail_with_custom_error
def test_deliver_sm_fail_with_custom_error(self):
self.patch(DeliverShortMessageProcessor, 'decode_pdus',
lambda *a: [str('not a unicode string')])
transport, protocol = yield self.setup_bind(config={
"deliver_sm_decoding_error": "ESME_RSYSERR"
})
pdu = DeliverSM(
sequence_number=0, message_id='foo', short_message='bar')
protocol.dataReceived(pdu.get_bin())
[deliver_sm_resp] = yield wait_for_pdus(transport, 1)
self.assertCommand(
deliver_sm_resp, 'deliver_sm_resp', sequence_number=0,
status='ESME_RSYSERR')
开发者ID:areski,项目名称:vumi,代码行数:13,代码来源:test_protocol.py
示例17: test_mo_delivery_report_pdu
def test_mo_delivery_report_pdu(self):
smpp_helper = yield self.get_smpp_helper()
transport = smpp_helper.transport
yield transport.message_stash.set_remote_message_id('bar', 'foo')
pdu = DeliverSM(sequence_number=1)
pdu.add_optional_parameter('receipted_message_id', 'foo')
pdu.add_optional_parameter('message_state', 2)
yield smpp_helper.handle_pdu(pdu)
[event] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assertEqual(event['event_type'], 'delivery_report')
self.assertEqual(event['delivery_status'], 'delivered')
self.assertEqual(event['user_message_id'], 'bar')
开发者ID:komuW,项目名称:vumi,代码行数:14,代码来源:test_smpp_transport.py
示例18: test_ordering
def test_ordering(self):
"""
Out of order pieces must be re-assembled in-order
"""
sar_1 = DeliverSM(
1,
short_message='\x00\x03\xff\x04\x01There she was just a')
sar_2 = DeliverSM(
1,
short_message='\x00\x03\xff\x04\x02 walking down the street,')
sar_3 = DeliverSM(
1,
short_message='\x00\x03\xff\x04\x03 singing doo wa diddy')
sar_4 = DeliverSM(
1,
short_message='\x00\x03\xff\x04\x04 diddy dum diddy do')
multi = MultipartMessage()
multi.add_pdu(sar_3.get_obj())
multi.add_pdu(sar_4.get_obj())
multi.add_pdu(sar_2.get_obj())
multi.add_pdu(sar_1.get_obj())
self.assertEquals(multi.get_completed()['message'], (
'There she was just a walking down the street, '
'singing doo wa diddy diddy dum diddy do'))
开发者ID:flowroute,项目名称:python-smpp,代码行数:25,代码来源:test_multipart.py
示例19: test_deliver_sm_op_codes_end
def test_deliver_sm_op_codes_end(self):
session = SessionInfo()
transport = yield self.get_transport()
deliver_sm_processor = transport.deliver_sm_processor
session_manager = deliver_sm_processor.session_manager
yield session_manager.create_session(
session.vumi_id, ussd_code='*123#')
pdu = DeliverSM(1, short_message="", source_addr=session.addr)
pdu.add_optional_parameter('ussd_service_op', '81')
pdu.add_optional_parameter('its_session_info', session.its_info)
yield self.fake_smsc.handle_pdu(pdu)
[end] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(end['session_event'],
TransportUserMessage.SESSION_CLOSE)
开发者ID:praekelt,项目名称:vumi,代码行数:16,代码来源:test_sixdee.py
示例20: test_deliver_sm_op_codes_resume
def test_deliver_sm_op_codes_resume(self):
session = SessionInfo()
smpp_helper = yield self.get_smpp_helper()
deliver_sm_processor = smpp_helper.transport.deliver_sm_processor
session_manager = deliver_sm_processor.session_manager
yield session_manager.create_session(
session.vumi_id, ussd_code='*123#')
pdu = DeliverSM(1, short_message="", source_addr=session.addr)
pdu.add_optional_parameter('ussd_service_op', '12')
pdu.add_optional_parameter('its_session_info', session.its_info)
yield smpp_helper.handle_pdu(pdu)
[resume] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(resume['session_event'],
TransportUserMessage.SESSION_RESUME)
开发者ID:Nagato23,项目名称:vumi,代码行数:16,代码来源:test_sixdee.py
注:本文中的smpp.pdu_builder.DeliverSM类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论