本文整理汇总了Python中servicetest.sync_dbus函数的典型用法代码示例。如果您正苦于以下问题:Python sync_dbus函数的具体用法?Python sync_dbus怎么用?Python sync_dbus使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sync_dbus函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test
def test(q, bus, conn, stream):
roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
roster_event.stanza['type'] = 'result'
call_async(q, conn, "RequestHandles", cs.HT_GROUP, ['test'])
event = q.expect('dbus-return', method='RequestHandles')
test_handle = event.value[0][0]
call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_CONTACT_LIST,
cs.HT_GROUP, test_handle, True)
# A previous incarnation of this test --- written with the intention that
# RequestChannel would be called before the roster was received, to expose
# a bug in Gabble triggered by that ordering --- was racy: if the D-Bus
# daemon happened to be particularly busy, the call to RequestChannel
# reached Gabble after the roster stanza. (The race was discovered when
# that reversed order triggered a newly-introduced instance of the
# opposite bug to the one the test was targetting!) So we sync the XMPP
# stream and D-Bus queue here.
sync_stream(q, stream)
sync_dbus(bus, q, conn)
# send an empty roster
stream.send(roster_event.stanza)
event = q.expect('dbus-return', method='RequestChannel')
path = event.value[0]
while True:
event = q.expect('dbus-signal', signal='NewChannel')
assert event.args[0] == path, (event.args, path)
_, type, handle_type, handle, suppress_handler = event.args
if handle_type == cs.HT_GROUP and handle == test_handle:
break
开发者ID:jku,项目名称:telepathy-gabble,代码行数:35,代码来源:request-group-before-roster.py
示例2: signal_channel_expect_query
def signal_channel_expect_query(q, bus, account, conn, empathy, kopete):
# This target is special-cased in test-plugin.c
target = '[email protected]'
channel_properties = dbus.Dictionary(text_fixed_properties,
signature='sv')
channel_properties[cs.CHANNEL + '.TargetID'] = target
channel_properties[cs.CHANNEL + '.TargetHandle'] = \
conn.ensure_handle(cs.HT_CONTACT, target)
channel_properties[cs.CHANNEL + '.InitiatorID'] = target
channel_properties[cs.CHANNEL + '.InitiatorHandle'] = \
conn.ensure_handle(cs.HT_CONTACT, target)
channel_properties[cs.CHANNEL + '.Requested'] = False
channel_properties[cs.CHANNEL + '.Interfaces'] = \
dbus.Array([cs.CHANNEL_IFACE_DESTROYABLE, cs.CHANNEL_IFACE_GROUP,
],signature='s')
chan = SimulatedChannel(conn, channel_properties, group=True)
chan.announce()
e = q.expect('dbus-signal',
path=cs.CD_PATH,
interface=cs.CD_IFACE_OP_LIST,
signal='NewDispatchOperation')
cdo_path = e.args[0]
cdo_properties = e.args[1]
assert cdo_properties[cs.CDO + '.Account'] == account.object_path
assert cdo_properties[cs.CDO + '.Connection'] == conn.object_path
assert cs.CDO + '.Interfaces' in cdo_properties
handlers = cdo_properties[cs.CDO + '.PossibleHandlers'][:]
handlers.sort()
assert handlers == [cs.tp_name_prefix + '.Client.Empathy',
cs.tp_name_prefix + '.Client.Kopete'], handlers
e, k = q.expect_many(
EventPattern('dbus-method-call',
path=empathy.object_path,
interface=cs.OBSERVER, method='ObserveChannels',
handled=False),
EventPattern('dbus-method-call',
path=kopete.object_path,
interface=cs.OBSERVER, method='ObserveChannels',
handled=False),
)
# What does the policy service think?
permission = q.expect('dbus-method-call', path='/com/example/Policy',
interface='com.example.Policy', method='RequestPermission')
# Think about it for a bit
sync_dbus(bus, q, account)
# Both Observers indicate that they are ready to proceed
q.dbus_return(k.message, signature='')
q.dbus_return(e.message, signature='')
# Let the test code decide how to reply
return permission, chan, cdo_path
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-mission-control,代码行数:60,代码来源:dispatch-delayed-by-mini-plugin.py
示例3: test
def test(q, bus, conn, stream):
conn.Connect()
# Initial vCard request. Respond only after we call SetAliases().
vcard_get_event = q.expect('stream-iq', iq_type='get', to=None,
query_ns=ns.VCARD_TEMP, query_name='vCard')
sync_stream(q, stream)
handle = conn.GetSelfHandle()
call_async(q, conn.Aliasing, 'SetAliases', {handle: 'Some Guy'})
sync_dbus(bus, q, conn)
acknowledge_iq(stream, vcard_get_event.stanza)
# Gabble sets a new vCard with our nickname.
vcard_set_event = q.expect('stream-iq', iq_type='set',
query_ns=ns.VCARD_TEMP, query_name='vCard')
# Before the server replies, the user sets their avatar.
call_async(q, conn.Avatars, 'SetAvatar', 'hello', 'image/png')
sync_dbus(bus, q, conn)
acknowledge_iq(stream, vcard_set_event.stanza)
vcard_set_event = q.expect('stream-iq', iq_type='set',
query_ns=ns.VCARD_TEMP, query_name='vCard')
acknowledge_iq(stream, vcard_set_event.stanza)
q.expect('dbus-return', method='SetAvatar')
# And then crashes.
sync_stream(q, stream)
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:32,代码来源:overlapping-sets.py
示例4: tube_no_text
def tube_no_text(q, bus, conn, stream):
jid = '[email protected]'
# create a stream tube.
# this will need a MUC channel to be opened, but we want to make
# sure it doesn't get signalled.
request_stream_tube(q, bus, conn, 'CreateChannel', jid)
send_muc_presence(q, stream, jid)
ret, new_sig = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannels'))
q.forbid_events([EventPattern('dbus-signal', signal='NewChannels')])
tube_path, tube_props = ret.value
assertEquals(cs.CHANNEL_TYPE_STREAM_TUBE, tube_props[cs.CHANNEL_TYPE])
channels = new_sig.args[0]
assertEquals(1, len(channels))
path, props = channels[0]
assertEquals(tube_path, path)
assertEquals(tube_props, props)
sync_dbus(bus, q, conn)
q.unforbid_all()
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:29,代码来源:only-text-when-needed.py
示例5: advertise_caps
def advertise_caps(q, bus, conn, stream, filters, expected_features, unexpected_features,
expected_caps):
# make sure nothing from a previous update is still running
sync_dbus(bus, q, conn)
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
ret_caps = conn.ContactCapabilities.UpdateCapabilities(
[(cs.CLIENT + '.Foo', filters, [])])
# Expect Gabble to reply with the correct caps
event, namespaces, _, signaled_caps = receive_presence_and_ask_caps(q, stream)
assertSameElements(expected_caps, signaled_caps[self_handle])
assertContains(ns.TP_FT_METADATA, namespaces)
for var in expected_features:
assertContains(var, namespaces)
for var in unexpected_features:
assertDoesNotContain(var, namespaces)
# Check our own caps
caps = get_contacts_capabilities_sync(conn, [self_handle])
assertSameElements(expected_caps, caps[self_handle])
# check the Contacts interface give the same caps
caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
[self_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
[self_handle][cs.ATTR_CONTACT_CAPABILITIES]
assertSameElements(caps[self_handle], caps_via_contacts_iface)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:31,代码来源:ft-client-caps.py
示例6: tube_remains_text_closes
def tube_remains_text_closes(q, bus, conn):
jid = 'test-muc'
connect(q, bus, conn)
text_chan, text_path, _ = text_channel(q, bus, conn, 'CreateChannel', jid)
tube_chan, tube_path, _ = stream_tube(q, bus, conn, 'CreateChannel', jid)
# now let's try and close the text channel
# this should happen sucessfully but the tube channel
# should stick around
forbidden = [EventPattern('dbus-signal', signal='ChannelClosed',
args=[tube_path])]
q.forbid_events(forbidden)
assert_on_bus(q, tube_chan)
assert_on_bus(q, text_chan)
text_chan.Close()
expect_close(q, text_path)
sync_dbus(bus, q, conn)
assert_on_bus(q, tube_chan)
assert_not_on_bus(q, text_chan)
q.unforbid_events(forbidden)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:27,代码来源:only-text-muc-when-needed.py
示例7: test
def test(q, bus, conn, stream):
conn.Connect()
roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
roster_event.stanza['type'] = 'result'
call_async(q, conn, "RequestHandles", cs.HT_GROUP, ['test'])
event = q.expect('dbus-return', method='RequestHandles')
test_handle = event.value[0][0]
# send an empty roster
stream.send(roster_event.stanza)
sync_stream(q, stream)
sync_dbus(bus, q, conn)
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CONTACT_LIST,
cs.TARGET_HANDLE_TYPE: cs.HT_GROUP,
cs.TARGET_HANDLE: test_handle,
})
event = q.expect('dbus-return', method='CreateChannel')
ret_path, ret_props = event.value
event = q.expect('dbus-signal', signal='NewChannels')
path, props = event.args[0][0]
assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_CONTACT_LIST, props
assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_GROUP, props
assert props[cs.TARGET_HANDLE] == test_handle, props
assert props[cs.TARGET_ID] == 'test', props
assert ret_path == path, (ret_path, path)
assert ret_props == props, (ret_props, props)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:35,代码来源:request-group-after-roster.py
示例8: test
def test(q, bus, conn, stream, is_google):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
result = make_result_iq(stream, iq_event.stanza)
# Testing reveals that Google's vCard server does not actually support
# NICKNAME (or indeed any fields beside FN, N and PHOTO): if you set a
# vCard including it, it accepts the request but strips out the unsupported
# fields. So if the server looks like Google, it's a redundant set
# operation on FN that we want to avoid.
if is_google:
vcard = result.firstChildElement()
vcard.addElement('FN', content='oh hello there')
else:
vcard = result.firstChildElement()
vcard.addElement('NICKNAME', content='oh hello there')
stream.send(result)
q.forbid_events([
EventPattern('stream-iq', iq_type='set', query_ns='vcard-temp',
query_name='vCard')
])
sync_stream(q, stream)
sync_dbus(bus, q, conn)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:26,代码来源:redundant-set.py
示例9: test
def test(q, bus, conn, stream):
event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, event.stanza)
handle = conn.RequestHandles(1, ['[email protected]'])[0]
call_async(q, conn.ContactInfo, 'RefreshContactInfo', [handle])
event = q.expect('stream-iq', to='[email protected]', query_ns='vcard-temp',
query_name='vCard')
result = make_result_iq(stream, event.stanza)
result.firstChildElement().addElement('FN', content='Bob')
n = result.firstChildElement().addElement('N')
n.addElement('GIVEN', content='Bob')
result.firstChildElement().addElement('NICKNAME',
content=r'bob,bob1\,,bob2,bob3\,bob4')
label = result.firstChildElement().addElement('LABEL')
label.addElement('LINE', content='42 West Wallaby Street')
label.addElement('LINE', content="Bishop's Stortford\n")
label.addElement('LINE', content='Huntingdon')
org = result.firstChildElement().addElement('ORG')
# ORG is a sequence of decreasingly large org.units, starting
# with the organisation name itself (but here we've moved the org name
# to the end, to make sure that works.)
org.addElement('ORGUNIT', content='Dept. of Examples')
org.addElement('ORGUNIT', content='Exemplary Team')
org.addElement('ORGNAME', content='Collabora Ltd.')
stream.send(result)
q.expect('dbus-signal', signal='ContactInfoChanged',
args=[handle, [(u'fn', [], [u'Bob']),
(u'n', [], [u'', u'Bob', u'', u'', u'']),
(u'nickname', [], [r'bob,bob1\,,bob2,bob3\,bob4']),
# LABEL comes out as a single blob of text
(u'label', [], ['42 West Wallaby Street\n'
"Bishop's Stortford\n"
'Huntingdon\n']),
# ORG is a sequence of decreasingly large org.units, starting
# with the organisation
(u'org', [], [u'Collabora Ltd.', u'Dept. of Examples',
u'Exemplary Team']),
]])
# ContactInfoChanged should not be signalled again
forbidden = [EventPattern('dbus-signal', signal='ContactInfoChanged')]
q.forbid_events(forbidden)
# Refresh the contact info again; gabble should contact the server again
call_async(q, conn.ContactInfo, 'RefreshContactInfo', [handle])
event = q.expect('stream-iq', to='[email protected]', query_ns='vcard-temp',
query_name='vCard')
sync_dbus(bus, q, conn)
q.unforbid_events(forbidden)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:57,代码来源:refresh-contact-info.py
示例10: test
def test(q, bus, conn, stream):
path, _ = conn.Future.EnsureSidecar(CONSOLE_PLUGIN_IFACE)
console = ProxyWrapper(bus.get_object(conn.bus_name, path),
CONSOLE_PLUGIN_IFACE)
assert not console.Properties.Get(CONSOLE_PLUGIN_IFACE, 'SpewStanzas')
es = [
EventPattern('dbus-signal', signal='StanzaReceived'),
EventPattern('dbus-signal', signal='StanzaSent'),
]
q.forbid_events(es)
call_async(q, console, 'SendIQ', 'get', STACY,
'<coffee xmlns="urn:unimaginative"/>')
e = q.expect('stream-iq', iq_type='get', query_ns='urn:unimaginative',
query_name='coffee')
acknowledge_iq(stream, e.stanza)
e = q.expect('dbus-return', method='SendIQ')
type_, body = e.value
assertEquals('result', type_)
# We just assume the body works.
# Turn on signalling incoming and outgoing stanzas
console.Properties.Set(CONSOLE_PLUGIN_IFACE, 'SpewStanzas', True)
sync_dbus(bus, q, conn)
q.unforbid_events(es)
send_unrecognised_get(q, stream)
e = q.expect('dbus-signal', signal='StanzaReceived')
xml, = e.args
assertContains('<iq', xml)
assertContains('<dont-handle-me-bro', xml)
signal = q.expect('dbus-signal', signal='StanzaSent')
assertContains('service-unavailable', signal.args[0])
# Turn off spewing out stanzas; check it works.
console.Properties.Set(CONSOLE_PLUGIN_IFACE, 'SpewStanzas', False)
q.forbid_events(es)
send_unrecognised_get(q, stream)
sync_dbus(bus, q, conn)
# Try sending just any old stanza
console.SendStanza('''
<message to='%(stacy)s' type='headline'>
<body>
Hi sis.
</body>
</message>''' % { 'stacy': STACY })
e = q.expect('stream-message', to=STACY, message_type='headline')
# Wocky fills in xmlns='' for us if we don't specify a namespace... great.
# So this means <message/> gets sent as <message xmlns=''/> and the server
# kicks us off.
assertNotEquals('', e.stanza.uri)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:56,代码来源:console.py
示例11: receive_caps
def receive_caps(q, bus, conn, service, contact, contact_handle, features,
expected_caps, expect_disco=True, expect_ccc=True):
ver = compute_caps_hash([], features, {})
txt_record = { "txtvers": "1", "status": "avail",
"node": client, "ver": ver, "hash": "sha-1"}
listener, port = setup_stream_listener(q, contact)
AvahiAnnouncer(contact, "_presence._tcp", port, txt_record)
if expect_disco:
# Salut looks up our capabilities
e = q.expect('incoming-connection', listener=listener)
stream = e.connection
event = q.expect('stream-iq', to=contact, query_ns=ns.DISCO_INFO,
connection=stream)
query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
assert query_node.attributes['node'] == \
client + '#' + ver
# send good reply
result = make_result_iq(event.stanza)
query = result.firstChildElement()
query['node'] = client + '#' + ver
for f in features:
feature = query.addElement('feature')
feature['var'] = f
stream.send(result)
if expect_ccc:
event = q.expect('dbus-signal', signal='ContactCapabilitiesChanged')
announced_ccs, = event.args
assertSameElements(expected_caps, announced_ccs[contact_handle])
else:
if expect_disco:
# Make sure Salut's got the caps
sync_stream(q, stream)
# check the Contacts interface give the same caps
caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
[contact_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
[contact_handle][cs.ATTR_CONTACT_CAPABILITIES]
assertSameElements(expected_caps, caps_via_contacts_iface)
# close the connection and expect a new one to be opened by Salut
# the next time we need some discoing doing
if expect_disco:
stream.send('</stream:stream>')
stream.transport.loseConnection()
# pass some time so Salut knows the connection is lost and
# won't try and send stuff down a closed connection on the
# next test.
sync_dbus(bus, q, conn)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:56,代码来源:ft-client-caps.py
示例12: test
def test(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
messages = []
def new_message(timestamp, domain, level, string):
messages.append((timestamp, domain, level, string))
debug = bus.get_object(conn.bus_name, cs.DEBUG_PATH)
debug_iface = dbus.Interface(debug, cs.DEBUG_IFACE)
debug_iface.connect_to_signal('NewDebugMessage', new_message)
props_iface = dbus.Interface(debug, cs.PROPERTIES_IFACE)
assert len(debug_iface.GetMessages()) > 0
# Turn signalling on and generate some messages.
assert len(messages) == 0
assert props_iface.Get(cs.DEBUG_IFACE, 'Enabled') == False
props_iface.Set(cs.DEBUG_IFACE, 'Enabled', True)
self_handle = conn.Get(cs.CONN, 'SelfHandle', dbus_interface=cs.PROPERTIES_IFACE)
channel_path, _ = conn.Requests.CreateChannel(
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: self_handle })
q.expect('dbus-signal', signal='NewChannels')
if DEBUGGING:
assert len(messages) > 0
else:
assertEquals([], messages)
# Turn signalling off and check we don't get any more messages.
props_iface.Set(cs.DEBUG_IFACE, 'Enabled', False)
sync_dbus(bus, q, conn)
snapshot = list(messages)
channel = bus.get_object(conn.bus_name, channel_path)
channel.Close(dbus_interface=cs.CHANNEL)
q.expect('dbus-signal', signal='Closed')
channel_path, _ = conn.Requests.CreateChannel(
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: self_handle })
q.expect('dbus-signal', signal='NewChannels')
assertEquals (snapshot, messages)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:54,代码来源:test-debug.py
示例13: text_can_automatically_close
def text_can_automatically_close(q, bus, conn, stream):
jid = '[email protected]'
tube_chan, tube_path, _ = stream_tube(q, bus, conn, stream, 'CreateChannel', jid)
sync_dbus(bus, q, conn)
tube_chan.Close()
expect_close(q, tube_path, stream, jid)
assert_not_on_bus(q, tube_chan)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:11,代码来源:only-text-when-needed.py
示例14: run_test
def run_test(q, bus, conn, stream, jt, request_before_presence):
"""
Requests streams on a media channel to jt.remote_jid, either before their
presence is received (if request_before_presence is True) or after their
presence is received but before we've got a disco response for their
capabilities (otherwise).
"""
# We intentionally DON'T set remote presence yet. Since Gabble is still
# unsure whether to treat contact as offline for this purpose, it
# will tentatively allow channel creation and contact handle addition
request = dbus.Dictionary(
{
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_ID: jt.remote_jid,
},
signature="sv",
)
path, props = conn.CreateChannel(request, dbus_interface=cs.CONN_IFACE_REQUESTS)
media_iface = make_channel_proxy(conn, path, "Channel.Type.StreamedMedia")
handle = props[cs.TARGET_HANDLE]
sync_dbus(bus, q, conn)
def call_request_streams():
call_async(q, media_iface, "RequestStreams", handle, [cs.MEDIA_STREAM_TYPE_AUDIO])
def send_presence():
jt.send_remote_presence()
return q.expect("stream-iq", query_ns=ns.DISCO_INFO, to=jt.remote_jid)
if request_before_presence:
# Request streams before either <presence> or caps have arrived. Gabble
# should wait for both to arrive before returning from RequestStreams.
call_request_streams()
# Ensure Gabble's received the method call.
sync_dbus(bus, q, conn)
# Now send the presence.
info_event = send_presence()
else:
info_event = send_presence()
# Now call RequestStreams; it should wait for the disco reply.
call_request_streams()
jt.send_remote_disco_reply(info_event.stanza)
# RequestStreams should now happily complete
q.expect("dbus-return", method="RequestStreams")
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:53,代码来源:test-wait-for-caps.py
示例15: test
def test(q, bus, conn, stream):
conn.Connect()
expect_and_handle_get_vcard(q, stream)
# Ensure that Gabble's actually got the initial vCard reply; if it hasn't
# processed it by the time we call SetAliases, the latter will wait for it
# to reply and then set immediately.
sync_stream(q, stream)
handle = conn.GetSelfHandle()
call_async(q, conn.Aliasing, 'SetAliases', {handle: 'Some Guy'})
# SetAliases requests vCard v1
get_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
query_name='vCard', iq_type='get')
iq = get_vcard_event.stanza
vcard = iq.firstChildElement()
assert vcard.name == 'vCard', vcard.toXml()
call_async(q, conn.Avatars, 'SetAvatar', 'hello', 'image/png')
# We don't expect Gabble to send a second vCard request, since there's one
# outstanding. But we want to ensure that SetAvatar reaches Gabble before
# the empty vCard does.
sync_dbus(bus, q, conn)
# Send back current empty vCard
result = make_result_iq(stream, iq)
# result already includes the <vCard/> from the query, which is all we need
stream.send(result)
def has_nickname_and_photo(vcard):
nicknames = xpath.queryForNodes('/vCard/NICKNAME', vcard)
assert nicknames is not None
assert len(nicknames) == 1
assert str(nicknames[0]) == 'Some Guy'
photos = xpath.queryForNodes('/vCard/PHOTO', vcard)
assert photos is not None and len(photos) == 1, repr(photos)
types = xpath.queryForNodes('/PHOTO/TYPE', photos[0])
binvals = xpath.queryForNodes('/PHOTO/BINVAL', photos[0])
assert types is not None and len(types) == 1, repr(types)
assert binvals is not None and len(binvals) == 1, repr(binvals)
assert str(types[0]) == 'image/png'
got = str(binvals[0])
exp = base64.b64encode('hello')
assert got == exp, (got, exp)
# Now Gabble should set a new vCard with both of the above changes.
expect_and_handle_set_vcard(q, stream, has_nickname_and_photo)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:53,代码来源:test-vcard-race.py
示例16: text_can_automatically_close
def text_can_automatically_close(q, bus, conn):
jid = 'test-muc'
connect(q, bus, conn)
tube_chan, tube_path, _ = stream_tube(q, bus, conn, 'CreateChannel', jid)
sync_dbus(bus, q, conn)
tube_chan.Close()
expect_close(q, tube_path)
assert_not_on_bus(q, tube_chan)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:13,代码来源:only-text-muc-when-needed.py
示例17: test
def test(q, bus, conn, stream):
def send_own_message(to, text):
iq = elem_iq(stream, 'set', from_='chat.facebook.com')(
elem(NS_FACEBOOK_MESSAGES, 'own-message', to=to, self='false')(
elem('body')(text)
)
)
stream.send(iq)
q.expect('stream-iq', iq_type='result', iq_id=iq['id'])
# First, test receiving an own-message stanza for a message sent to a
# contact we have an open channel for.
jid = '[email protected]'
_, path, props = conn.Requests.EnsureChannel({
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_ID: jid,
})
channel = wrap_channel(bus.get_object(conn.bus_name, path),
'Text', ['Messages'])
handle = props[cs.TARGET_HANDLE]
text = u'omg omg its ur birthdayy <3 <3 xoxoxoxo'
send_own_message(to=jid, text=text)
e = q.expect('dbus-signal', signal='MessageReceived')
message, = e.args
assertLength(1, message)
header = message[0]
assertEquals(handle, header['message-sender'])
assertEquals(cs.MT_DELIVERY_REPORT, header['message-type'])
assertEquals(cs.DELIVERY_STATUS_ACCEPTED, header['delivery-status'])
assertContains('delivery-echo', header)
echo = header['delivery-echo']
echo_header, echo_body = echo
assertEquals(conn.GetSelfHandle(), echo_header['message-sender'])
assertEquals('text/plain', echo_body['content-type'])
assertEquals(text, echo_body['content'])
channel.Text.AcknowledgePendingMessages([header['pending-message-id']])
channel.Close()
# Now test receiving an own-message stanza for a message sent to a contact
# we don't have a channel open for. It should be ignored (but acked). This
# is consistent with delivery failure reports.
q.forbid_events([EventPattern('dbus-signal', signal='MessageReceived')])
send_own_message(to='[email protected]',
text=u'please ignore this message')
sync_dbus(bus, q, conn)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:51,代码来源:facebook-own-message.py
示例18: _test_remote_status_away
def _test_remote_status_away(q, bus, conn, stream, msg, show, list_attrs):
events = [EventPattern('dbus-signal', signal='PresencesChanged',
interface=cs.CONN_IFACE_SIMPLE_PRESENCE,
args=[{1: (presence_types[show], show, msg)}])]
q.forbid_events(events)
list_attrs['status'] = list_attrs.get('status', msg)
stream.set_shared_status_lists(**list_attrs)
q.expect('stream-iq', iq_type='result')
sync_dbus(bus, q, conn)
q.unforbid_events(events)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:14,代码来源:shared-status.py
示例19: test
def test(q, bus, conn, stream, channel_type):
jt = jingletest.JingleTest(stream, '[email protected]', '[email protected]/Foo')
# We intentionally DON'T set remote presence yet. Since Gabble is still
# unsure whether to treat contact as offline for this purpose, it
# will tentatively allow channel creation and contact handle addition
handle = conn.RequestHandles(cs.HT_CONTACT, [jt.remote_jid])[0]
if channel_type == cs.CHANNEL_TYPE_STREAMED_MEDIA:
path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.HT_CONTACT, handle, True)
media_iface = make_channel_proxy(conn, path,
'Channel.Type.StreamedMedia')
# So it turns out that the calls to RequestStreams and Disconnect could be
# reordered while the first waits on the result of introspecting the
# channel's object which is kicked off by making a proxy object for it,
# whereas the connection proxy is long ago introspected. Isn't dbus-python
# great? Syncing here forces that introspection to finish so we can rely on
# the ordering of RequestStreams and Disconnect. Yay.
sync_dbus(bus, q, conn)
# Now we request streams before either <presence> or caps have arrived
if channel_type == cs.CHANNEL_TYPE_STREAMED_MEDIA:
call_async(q, media_iface, 'RequestStreams', handle,
[cs.MEDIA_STREAM_TYPE_AUDIO])
before_events, after_events = disconnect_conn(q, conn, stream,
[EventPattern('dbus-error', method='RequestStreams')])
# RequestStreams should now return NotAvailable
assert before_events[0].error.get_dbus_name() == cs.NOT_AVAILABLE, \
before_events[0].error
else:
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: channel_type,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_ID: jt.remote_jid,
cs.CALL_INITIAL_AUDIO: True
})
before_events, after_events = disconnect_conn(q, conn, stream,
[EventPattern('dbus-error', method='CreateChannel')])
# CreateChannel should now return Disconnected
assert before_events[0].error.get_dbus_name() == cs.DISCONNECTED, \
before_events[0].error
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:48,代码来源:test-wait-for-caps-incomplete.py
示例20: test
def test(q, bus, conn, stream):
room = '[email protected]'
chan, path, props, disco = join_muc(q, bus, conn, stream,
room,
also_capture=[EventPattern('stream-iq', iq_type='get',
query_name='query', query_ns=ns.DISCO_INFO, to=room)])
sync_dbus(bus, q, conn)
# we call Close...
call_async(q, chan, 'Close')
q.expect('dbus-return', method='Close')
# ...so gabble announces our unavailable presence to the MUC.
event = q.expect('stream-presence', to=room + '/test')
elem = event.stanza
assertEquals('unavailable', elem['type'])
# while we wait for the conference server to echo our unavailable
# presence, we try and create the same channel again...
call_async(q, conn.Requests, 'CreateChannel', {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: room
})
# ...which should fail because the channel hasn't closed yet.
q.expect('dbus-error', method='CreateChannel', name=cs.NOT_AVAILABLE)
# the conference server finally gets around to echoing our
# unavailable presence...
echo_muc_presence(q, stream, elem, 'none', 'participant')
# ...and only now is the channel closed.
q.expect_many(EventPattern('dbus-signal', signal='Closed'),
EventPattern('dbus-signal', signal='ChannelClosed'))
# now that the channel has finally closed, let's try and request
# it again which should succeed!
chan, _, _ = join_muc(q, bus, conn, stream, room)
# let's clear up though.
chan.Close()
event = q.expect('stream-presence', to=room + '/test')
echo_muc_presence(q, stream, event.stanza, 'none', 'participant')
q.expect_many(EventPattern('dbus-signal', signal='Closed'),
EventPattern('dbus-signal', signal='ChannelClosed'))
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:48,代码来源:presence-before-closing.py
注:本文中的servicetest.sync_dbus函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论