本文整理汇总了Python中servicetest.assertContains函数的典型用法代码示例。如果您正苦于以下问题:Python assertContains函数的具体用法?Python assertContains怎么用?Python assertContains使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assertContains函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_create_invisible_list
def test_create_invisible_list(q, bus, conn, stream):
conn.SimplePresence.SetPresence("away", "")
conn.Connect()
stream.handle_get_all_privacy_lists(q, bus, conn)
get_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='get')
list_node = xpath.queryForNodes('//list', get_list.query)[0]
assertEquals('invisible', list_node['name'])
error = domish.Element((None, 'error'))
error['type'] = 'cancel'
error.addElement((ns.STANZA, 'item-not-found'))
send_error_reply (stream, get_list.stanza, error)
create_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='set')
list_node = xpath.queryForNodes('//list', create_list.query)[0]
assertEquals('invisible', list_node['name'])
assertNotEquals([],
xpath.queryForNodes('/query/list/item/presence-out', create_list.query))
acknowledge_iq(stream, create_list.stanza)
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
assertContains("hidden",
conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE, "Statuses"))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:28,代码来源:invisible_xep_0126.py
示例2: test_with_xep0186
def test_with_xep0186(q, bus, conn, stream):
statuses = conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE,
'Statuses')
# testbusy and testaway are provided by test plugin
assertContains('testbusy', statuses)
assertContains('testaway', statuses)
assertEquals(statuses['testbusy'][0], cs.PRESENCE_BUSY)
assertEquals(statuses['testaway'][0], cs.PRESENCE_AWAY)
conn.SimplePresence.SetPresence('testbusy', '')
conn.Connect()
# ... gabble asks for all the available lists on the server ...
stream.handle_get_all_privacy_lists(q, bus, conn,
lists=["foo-list", "test-busy-list", "bar-list"])
get_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='set')
list_node = xpath.queryForNodes('//active', get_list.query)[0]
# ... and then activates the one linked with the requested status
# Note: testbusy status is linked to test-busy-list by test plugin
assertEquals('test-busy-list', list_node['name'])
acknowledge_iq(stream, get_list.stanza)
q.expect('dbus-signal', signal='PresencesChanged',
args=[{1L: (cs.PRESENCE_BUSY, u'testbusy', '')}])
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:29,代码来源:plugins.py
示例3: build_request
def build_request(conn, channel_name, use_room):
rccs = conn.Properties.Get(cs.CONN_IFACE_REQUESTS,
'RequestableChannelClasses')
if use_room:
# We allow TargetHandleType in Room-flavoured requests, but it has to
# be None if specified.
assertContains(
({ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT },
[cs.TARGET_HANDLE_TYPE, cs.ROOM_NAME],
), rccs)
request = {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.ROOM_NAME: '#idletest'
}
else:
assertContains(
({ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
},
[cs.TARGET_HANDLE, cs.TARGET_ID]
), rccs)
request = {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: '#idletest',
}
return dbus.Dictionary(request, signature='sv')
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:30,代码来源:requests-muc.py
示例4: stop_sending
def stop_sending(self, content):
self.sending = False
content.stream.SetSending(False)
self.q.expect('dbus-signal', signal='SendingStateChanged',
args=[cs.CALL_STREAM_FLOW_STATE_PENDING_STOP],
path=content.stream.__dbus_object_path__)
content.stream.Media.CompleteSendingStateChange(
cs.CALL_STREAM_FLOW_STATE_STOPPED)
o = self.q.expect_many(
EventPattern('dbus-signal', signal='SendingStateChanged',
args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
path=content.stream.__dbus_object_path__),
EventPattern('dbus-signal', signal='LocalSendingStateChanged',
path=content.stream.__dbus_object_path__),
EventPattern('sip-invite'))
assertEquals(cs.CALL_SENDING_STATE_NONE, o[1].args[0])
assertEquals(self.self_handle, o[1].args[1][0])
reinvite_event = o[2]
assertContains('a=recvonly', reinvite_event.sip_message.body)
self.context.check_call_sdp(reinvite_event.sip_message.body)
body = reinvite_event.sip_message.body.replace(
'recvonly', self.receiving and 'sendonly' or 'inactive')
self.context.accept(reinvite_event.sip_message, body)
ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
self.q.expect('sip-ack', cseq=ack_cseq)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:33,代码来源:direction-change.py
示例5: test
def test(q, bus, conn, stream):
# The user happily joins a MUC
_, chan, _, _ = join_muc(q, bus, conn, stream, MUC)
muc_self_handle = chan.Group.GetSelfHandle()
muc_self_jid, = conn.InspectHandles(cs.HT_CONTACT, [muc_self_handle])
# But then Bob kicks us.
bob_jid = "%s/bob" % MUC
bob_handle, = conn.RequestHandles(cs.HT_CONTACT, [bob_jid])
stream.send(
elem("presence", from_=muc_self_jid, type="unavailable")(
elem(ns.MUC_USER, "x")(
elem("item", affiliation="none", role="none")(elem("actor", jid=bob_jid), elem("reason")(u"bye")),
elem("status", code="307"),
)
)
)
mcd_event = q.expect("dbus-signal", signal="MembersChangedDetailed")
added, removed, local_pending, remote_pending, details = mcd_event.args
assertEquals([], added)
assertEquals([muc_self_handle], removed)
assertEquals([], local_pending)
assertEquals([], remote_pending)
assertContains("actor", details)
assertEquals(bob_handle, details["actor"])
assertEquals(cs.GC_REASON_KICKED, details["change-reason"])
assertEquals("bye", details["message"])
q.expect("dbus-signal", signal="ChannelClosed")
开发者ID:jku,项目名称:telepathy-gabble,代码行数:30,代码来源:kicked.py
示例6: test_deny_unblock_remove
def test_deny_unblock_remove(q, bus, conn, stream, stored, deny):
"""
Test unblocking a contact, and, while that request is pending, deleting
them.
"""
self_handle = conn.GetSelfHandle()
# This contact was on our roster, blocked and subscribed, when we started.
contact = '[email protected]'
handle = conn.RequestHandles(cs.HT_CONTACT, [contact])[0]
# They're blocked, and we have a bidi subscription, so they should be on
# deny and stored. (We already checked this earlier, but we've been messing
# with the roster so let's be sure the preconditions are okay...)
assertContains(handle,
deny.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))
assertContains(handle,
stored.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))
# Unblock them.
call_async(q, deny.Group, 'RemoveMembers', [handle], "")
roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
item = roster_event.query.firstChildElement()
assertEquals(contact, item['jid'])
assertDoesNotContain((ns.GOOGLE_ROSTER, 't'), item.attributes)
# If we now remove them from stored, the edit shouldn't be sent until the
# unblock event has had a reply.
q.forbid_events(remove_events)
call_async(q, stored.Group, 'RemoveMembers', [handle], "")
# Make sure if the remove is sent prematurely, we catch it.
sync_stream(q, stream)
q.unforbid_events(remove_events)
# So now we send a roster push and reply for the unblock request.
stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
'both', False, attrs={}))
acknowledge_iq(stream, roster_event.stanza)
# And on receiving the push and reply, Gabble should show them being
# removed from deny, and send a remove.
_, roster_event = q.expect_many(
EventPattern('dbus-signal', signal='MembersChanged',
args=['', [], [handle], [], [], self_handle, cs.GC_REASON_NONE],
predicate=is_deny),
remove_events[0],
)
item = roster_event.query.firstChildElement()
assertEquals(contact, item['jid'])
stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
'remove', False, attrs={}))
acknowledge_iq(stream, roster_event.stanza)
q.expect('dbus-signal', signal='MembersChanged',
args=['', [], [handle], [], [], 0, cs.GC_REASON_NONE],
predicate=is_stored)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:60,代码来源:test-google-roster.py
示例7: receiving_failed
def receiving_failed(self, content):
self.receiving = False
assert self.contents[0].stream.Properties.Get(cs.CALL_STREAM_IFACE_MEDIA,
'SendingState') == cs.CALL_STREAM_FLOW_STATE_STARTED
content.stream.Media.ReportReceivingFailure(
cs.CALL_STATE_CHANGE_REASON_MEDIA_ERROR, "", "receiving error")
o = self.q.expect_many(
EventPattern('dbus-signal', signal='ReceivingStateChanged',
args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
path=content.stream.__dbus_object_path__),
EventPattern('dbus-signal', signal='RemoteMembersChanged',
path=content.stream.__dbus_object_path__,
predicate=lambda e: e.args[0] == {self.remote_handle: cs.CALL_SENDING_STATE_PENDING_STOP_SENDING}),
EventPattern('sip-invite'))
reinvite_event = o[2]
assertContains('a=sendonly', reinvite_event.sip_message.body)
self.context.check_call_sdp(reinvite_event.sip_message.body)
body = reinvite_event.sip_message.body.replace(
'sendonly', self.sending and 'recvonly' or 'inactive')
self.context.accept(reinvite_event.sip_message, body)
ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
self.q.expect('sip-ack', cseq=ack_cseq)
self.start_receiving(content)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:32,代码来源:direction-change.py
示例8: advertise_caps
def advertise_caps(q, conn, stream, filters, expected_features, unexpected_features,
expected_caps):
self_handle = conn.GetSelfHandle()
ret_caps = conn.ContactCapabilities.UpdateCapabilities(
[(cs.CLIENT + '.Foo', filters, [])])
# Expect Gabble to reply with the correct caps
event, namespaces, _, signaled_caps = receive_presence_and_ask_caps(q, stream)
assertSameElements(expected_caps, signaled_caps)
assertContains(ns.TUBES, namespaces)
for var in expected_features:
assertContains(var, namespaces)
for var in unexpected_features:
assertDoesNotContain(var, namespaces)
# Check our own caps
caps = conn.ContactCapabilities.GetContactCapabilities([self_handle])
assertSameElements(expected_caps, caps)
# check the Contacts interface give the same caps
caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
[self_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
[self_handle][cs.ATTR_CONTACT_CAPABILITIES]
assertSameElements(caps[self_handle], caps_via_contacts_iface)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:28,代码来源:tube-caps.py
示例9: test
def test(q, bus, conn):
cm = bus.get_object(cs.CM + '.ring',
tp_path_prefix + '/ConnectionManager/ring')
cm_iface = dbus.Interface(cm, cs.CM)
cm_prop_iface = dbus.Interface(cm, cs.PROPERTIES_IFACE)
protocols = unwrap(cm_prop_iface.Get(cs.CM, 'Protocols'))
assertEquals(set(['tel']), set(protocols.keys()))
protocol_names = unwrap(cm_iface.ListProtocols())
assertEquals(set(['tel']), set(protocol_names))
cm_params = cm_iface.GetParameters('tel')
local_props = protocols['tel']
local_params = local_props[cs.PROTOCOL + '.Parameters']
assertEquals(cm_params, local_params)
proto = bus.get_object(cm.bus_name, cm.object_path + '/tel')
proto_prop_iface = dbus.Interface(proto, cs.PROPERTIES_IFACE)
proto_props = unwrap(proto_prop_iface.GetAll(cs.PROTOCOL))
for key in ['Parameters', 'Interfaces', 'ConnectionInterfaces',
'RequestableChannelClasses', u'VCardField', u'EnglishName', u'Icon']:
a = local_props[cs.PROTOCOL + '.' + key]
b = proto_props[key]
assertEquals(a, b)
assertEquals('tel', proto_props['VCardField'])
assertEquals('Mobile Telephony', proto_props['EnglishName'])
assertEquals('im-tel', proto_props['Icon'])
assertContains(cs.CONN_IFACE_REQUESTS, proto_props['ConnectionInterfaces'])
开发者ID:jusa,项目名称:telepathy-ring,代码行数:32,代码来源:protocol.py
示例10: sending_failed
def sending_failed(self, content):
self.sending = False
content.stream.Media.ReportSendingFailure(
cs.CALL_STATE_CHANGE_REASON_MEDIA_ERROR, "", "sending error")
o = self.q.expect_many(
EventPattern('dbus-signal', signal='SendingStateChanged',
args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
path=content.stream.__dbus_object_path__),
EventPattern('dbus-signal', signal='LocalSendingStateChanged',
path=content.stream.__dbus_object_path__,
predicate=lambda e: e.args[0] == 0),
EventPattern('sip-invite'))
assertEquals(cs.CALL_SENDING_STATE_NONE, o[1].args[0])
assertEquals(self.self_handle, o[1].args[1][0])
reinvite_event = o[2]
assertContains('a=recvonly', reinvite_event.sip_message.body)
self.context.check_call_sdp(reinvite_event.sip_message.body)
body = reinvite_event.sip_message.body.replace(
'recvonly', self.receiving and 'sendonly' or 'inactive')
self.context.accept(reinvite_event.sip_message, body)
ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
self.q.expect('sip-ack', cseq=ack_cseq)
self.start_sending(content)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:31,代码来源:direction-change.py
示例11: test_content_addition
def test_content_addition(self):
path = self.chan.AddContent("Webcam", cs.CALL_MEDIA_TYPE_VIDEO,
cs.MEDIA_STREAM_DIRECTION_BIDIRECTIONAL,
dbus_interface=cs.CHANNEL_TYPE_CALL)
content = self.bus.get_object(self.conn.bus_name, path)
content_properties = content.GetAll(cs.CALL_CONTENT,
dbus_interface=dbus.PROPERTIES_IFACE)
assertEquals(cs.CALL_DISPOSITION_NONE,
content_properties["Disposition"])
#assertEquals(self_handle, content_properties["Creator"])
assertContains("Webcam", content_properties["Name"])
md = self.jt2.get_call_video_md_dbus()
self.check_and_accept_offer(content, md)
cstream = self.bus.get_object(self.conn.bus_name,
content_properties["Streams"][0])
candidates = self.jt2.get_call_remote_transports_dbus()
cstream.AddCandidates(candidates,
dbus_interface=cs.CALL_STREAM_IFACE_MEDIA)
self.q.expect('stream-iq',
predicate=self.jp.action_predicate('content-add'))
content.Remove(dbus_interface=cs.CALL_CONTENT)
self.q.expect('stream-iq',
predicate=self.jp.action_predicate('content-remove'))
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:28,代码来源:call-basics.py
示例12: test
def test(q, bus, conn, stream):
"""
Check that when we receive a roster update we emit a single AliasesChanged
for all the contacts.
"""
conn.Connect()
# Gabble asks the server for the initial roster
event = q.expect('stream-iq', iq_type='get', query_ns=ns.ROSTER)
result = make_result_iq(stream, event.stanza)
# We reply with a roster with two contacts
bob_jid = '[email protected]'
bob_alias = 'Bob Smith'
add_contact(result, bob_jid, bob_alias)
alice_jid = '[email protected]'
alice_alias = 'Alice'
add_contact(result, alice_jid, alice_alias)
stream.send(result)
# Check we get a single AliasesChanged for both contacts
event = q.expect('dbus-signal', signal='AliasesChanged')
added = event.args[0]
bob_handle, alice_handle = conn.get_contact_handles_sync(
[bob_jid, alice_jid])
assertLength(2, added)
assertContains((bob_handle, bob_alias), added)
assertContains((alice_handle, alice_alias), added)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:33,代码来源:initial-aliases.py
示例13: test_invisible
def test_invisible(q, bus, conn, stream):
assertContains("hidden",
conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE, "Statuses"))
conn.SimplePresence.SetPresence("hidden", "")
# First we send an <invisible/> command.
event = q.expect('stream-iq', query_name='invisible')
acknowledge_iq(stream, event.stanza)
# When that's returned successfully, we can signal the change on D-Bus.
q.expect('dbus-signal', signal='PresencesChanged',
interface=cs.CONN_IFACE_SIMPLE_PRESENCE,
args=[{1: (5, 'hidden', '')}])
conn.SimplePresence.SetPresence("away", "gone")
# First Gabble sends a <visible/> command.
event = q.expect('stream-iq', query_name='visible')
acknowledge_iq(stream, event.stanza)
# Then: "It is the responsibility of the client to send an undirected
# presence notification to the server". Plus, we should signal the change
# on D-Bus.
q.expect_many(
EventPattern('stream-presence', to=None),
EventPattern('dbus-signal', signal='PresencesChanged',
interface=cs.CONN_IFACE_SIMPLE_PRESENCE,
args=[{1: (3, 'away', 'gone')}]))
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:29,代码来源:invisible_xep_0186.py
示例14: test
def test(q, bus, conn, stream):
# The user happily joins a MUC
chan, _, _ = join_muc(q, bus, conn, stream, MUC)
muc_self_handle = chan.Properties.Get(cs.CHANNEL_IFACE_GROUP,
"SelfHandle")
muc_self_jid = conn.inspect_contact_sync(muc_self_handle)
# But then Bob kicks us.
bob_jid = '%s/bob' % MUC
bob_handle = conn.get_contact_handle_sync(bob_jid)
stream.send(
elem('presence', from_=muc_self_jid, type='unavailable')(
elem(ns.MUC_USER, 'x')(
elem('item', affiliation='none', role='none')(
elem('actor', jid=bob_jid),
elem('reason')(
u'bye'
)
),
elem('status', code='307'),
)
))
mcd_event = q.expect('dbus-signal', signal='MembersChangedDetailed')
added, removed, local_pending, remote_pending, details = mcd_event.args
assertEquals([], added)
assertEquals([muc_self_handle], removed)
assertEquals([], local_pending)
assertEquals([], remote_pending)
assertContains('actor', details)
assertEquals(bob_handle, details['actor'])
assertEquals(cs.GC_REASON_KICKED, details['change-reason'])
assertEquals('bye', details['message'])
q.expect('dbus-signal', signal='ChannelClosed')
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:35,代码来源:kicked.py
示例15: test_error
def test_error(q, bus, conn, stream):
assertContains(cs.CONN_IFACE_POWER_SAVING,
conn.Get(cs.CONN, "Interfaces",
dbus_interface=cs.PROPERTIES_IFACE))
assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))
call_async(q, conn.PowerSaving, 'SetPowerSaving', True)
stanza = expect_command(q, 'enable')
error = domish.Element((None, 'error'))
error.addElement((ns.STANZA, 'service-unavailable'))
send_error_reply(stream, stanza, error)
q.expect('dbus-error', method='SetPowerSaving', name=cs.NOT_AVAILABLE)
# Power saving state should remain false
assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:25,代码来源:power-save.py
示例16: advertise_caps
def advertise_caps(q, bus, conn, stream, filters, expected_features, unexpected_features,
expected_caps):
# make sure nothing from a previous update is still running
sync_dbus(bus, q, conn)
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
ret_caps = conn.ContactCapabilities.UpdateCapabilities(
[(cs.CLIENT + '.Foo', filters, [])])
# Expect Gabble to reply with the correct caps
event, namespaces, _, signaled_caps = receive_presence_and_ask_caps(q, stream)
assertSameElements(expected_caps, signaled_caps[self_handle])
assertContains(ns.TP_FT_METADATA, namespaces)
for var in expected_features:
assertContains(var, namespaces)
for var in unexpected_features:
assertDoesNotContain(var, namespaces)
# Check our own caps
caps = get_contacts_capabilities_sync(conn, [self_handle])
assertSameElements(expected_caps, caps[self_handle])
# check the Contacts interface give the same caps
caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
[self_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
[self_handle][cs.ATTR_CONTACT_CAPABILITIES]
assertSameElements(caps[self_handle], caps_via_contacts_iface)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:31,代码来源:ft-client-caps.py
示例17: check_NewChannels_signal
def check_NewChannels_signal(conn, args, channel_type, chan_path, contact_handle,
contact_id, initiator_handle):
"""
Checks the first argument, a one-tuple of arguments from NewChannels,
matches the other arguments.
"""
assert len(args) == 1, args
assert len(args[0]) == 1 # one channel
path, props = args[0][0]
assert path == chan_path, (emitted_path, chan_path)
assert props[cs.CHANNEL_TYPE] == channel_type, (props, channel_type)
assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT, props
assert props[cs.TARGET_HANDLE] == contact_handle, (props, contact_handle)
assert props[cs.TARGET_ID] == contact_id, (props, contact_id)
assert props[cs.REQUESTED] == True, props
assert props[cs.INITIATOR_HANDLE] == initiator_handle, \
(props, initiator_handle)
assert props[cs.INITIATOR_ID] == '[email protected]', props
# check that the newly announced channel is in the channels list
all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
assertContains((path, props), all_channels)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:25,代码来源:tubetestutil.py
示例18: test
def test(q, bus, conn, stream):
self_handle = conn.GetSelfHandle()
jid = '[email protected]'
foo_handle = conn.RequestHandles(cs.HT_CONTACT, [jid])[0]
properties = conn.GetAll(
cs.CONN_IFACE_REQUESTS, dbus_interface=dbus.PROPERTIES_IFACE)
assert properties.get('Channels') == [], properties['Channels']
assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
},
[cs.TARGET_HANDLE, cs.TARGET_ID],
) in properties.get('RequestableChannelClasses'),\
properties['RequestableChannelClasses']
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: foo_handle,
})
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
)
assert len(ret.value) == 2
emitted_props = ret.value[1]
assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT
assert emitted_props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
assert emitted_props[cs.TARGET_HANDLE] == foo_handle
assert emitted_props[cs.TARGET_ID] == jid
assert emitted_props[cs.REQUESTED] == True
assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
assert emitted_props[cs.INITIATOR_ID] == '[email protected]'
assertContains('text/plain', emitted_props[cs.SUPPORTED_CONTENT_TYPES])
assertEquals(0, emitted_props[cs.MESSAGE_PART_SUPPORT_FLAGS])
assertEquals(cs.DELIVERY_REPORTING_SUPPORT_FLAGS_RECEIVE_FAILURES,
emitted_props[cs.DELIVERY_REPORTING_SUPPORT])
assert old_sig.args[0] == ret.value[0]
assert old_sig.args[1] == cs.CHANNEL_TYPE_TEXT
assert old_sig.args[2] == cs.HT_CONTACT
assert old_sig.args[3] == foo_handle
assert old_sig.args[4] == True # suppress handler
assert len(new_sig.args) == 1
assert len(new_sig.args[0]) == 1 # one channel
assert len(new_sig.args[0][0]) == 2 # two struct members
assert new_sig.args[0][0][0] == ret.value[0]
assert new_sig.args[0][0][1] == ret.value[1]
properties = conn.GetAll(
cs.CONN_IFACE_REQUESTS, dbus_interface=dbus.PROPERTIES_IFACE)
assert new_sig.args[0][0] in properties['Channels'], \
(new_sig.args[0][0], properties['Channels'])
开发者ID:jku,项目名称:telepathy-gabble,代码行数:59,代码来源:initiate-requestotron.py
示例19: test_contact_blocking
def test_contact_blocking(q, bus, conn, stream, stored, deny):
"""test ContactBlocking API"""
assertContains(cs.CONN_IFACE_CONTACT_BLOCKING, conn.Properties.Get(cs.CONN, "Interfaces"))
# 3 contacts are blocked
blocked = conn.RequestBlockedContacts(dbus_interface=cs.CONN_IFACE_CONTACT_BLOCKING)
assertLength(3, blocked)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:8,代码来源:test-google-roster.py
示例20: test
def test(q, bus, conn, stream):
assertContains(cs.CONN_IFACE_POWER_SAVING,
conn.Get(cs.CONN, "Interfaces",
dbus_interface=cs.PROPERTIES_IFACE))
assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))
call_async(q, conn.PowerSaving, 'SetPowerSaving', True)
stanza = expect_command(q, 'enable')
stream.send(make_result_iq(stream, stanza, False))
q.expect_many(EventPattern('dbus-return', method='SetPowerSaving'),
EventPattern('dbus-signal', signal='PowerSavingChanged',
args=[True]))
assertEquals (True, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))
pattern = [EventPattern('stream-iq', query_name='query',
query_ns=ns.GOOGLE_QUEUE),
EventPattern('dbus-signal', signal='PowerSavingChanged',
args=[True])]
q.forbid_events(pattern)
# Enabling power saving again should be a no-op.
call_async(q, conn.PowerSaving, 'SetPowerSaving', True)
q.expect('dbus-return', method='SetPowerSaving')
q.unforbid_events(pattern)
call_async(q, conn.PowerSaving, 'SetPowerSaving', False)
stanza = expect_command(q, 'disable')
stream.send(make_result_iq(stream, stanza, False))
event, _, _ = q.expect_many(
EventPattern('stream-iq', query_name='query', query_ns=ns.GOOGLE_QUEUE),
EventPattern('dbus-return', method='SetPowerSaving'),
EventPattern('dbus-signal', signal='PowerSavingChanged',
args=[False]))
command = event.query.firstChildElement()
assertEquals("flush", command.name)
assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
"PowerSavingActive",
dbus_interface=cs.PROPERTIES_IFACE))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:55,代码来源:power-save.py
注:本文中的servicetest.assertContains函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论