本文整理汇总了Python中servicetest.wrap_channel函数的典型用法代码示例。如果您正苦于以下问题:Python wrap_channel函数的具体用法?Python wrap_channel怎么用?Python wrap_channel使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wrap_channel函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: incoming
def incoming(jp, q, bus, conn, stream):
remote_jid = '[email protected]/antennas'
jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
jt.prepare()
self_handle = conn.GetSelfHandle()
remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
for a, v in [(True, False), (False, True), (True, True)]:
if v and not jp.can_do_video():
continue
if not a and v and not jp.can_do_video_only():
continue
jt.incoming_call(audio=a, video=v)
e = q.expect('dbus-signal', signal='NewChannels')
chans = e.args[0]
assertLength(1, chans)
path, props = chans[0]
assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, props[cs.CHANNEL_TYPE])
assertEquals(a, props[cs.INITIAL_AUDIO])
assertEquals(v, props[cs.INITIAL_VIDEO])
chan = wrap_channel(bus.get_object(conn.bus_name, path),
cs.CHANNEL_TYPE_STREAMED_MEDIA)
chan.Close()
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:28,代码来源:initial-audio-video.py
示例2: check_neither
def check_neither(q, conn, bus, stream, remote_handle):
"""
Make a channel without specifying InitialAudio or InitialVideo; check
that it's announced with both False, and that they're both present and
false in GetAll().
"""
path, props = conn.Requests.CreateChannel({
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: remote_handle})
assertContains((cs.INITIAL_AUDIO, False), props.items())
assertContains((cs.INITIAL_VIDEO, False), props.items())
chan = wrap_channel(bus.get_object(conn.bus_name, path),
cs.CHANNEL_TYPE_STREAMED_MEDIA, ['MediaSignalling'])
props = chan.Properties.GetAll(cs.CHANNEL_TYPE_STREAMED_MEDIA)
assertContains(('InitialAudio', False), props.items())
assertContains(('InitialVideo', False), props.items())
# We shouldn't have started a session yet, so there shouldn't be any
# session handlers. Strictly speaking, there could be a session handler
# with no stream handlers, but...
session_handlers = chan.MediaSignalling.GetSessionHandlers()
assertLength(0, session_handlers)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:26,代码来源:initial-audio-video.py
示例3: test
def test(q, bus, conn, stream):
jp = JingleProtocol031()
remote_jid = '[email protected]/misc'
jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
jt.prepare()
self_handle = conn.GetSelfHandle()
remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
path, _ = conn.Requests.CreateChannel({
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: remote_handle})
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')
# In Gabble, the StreamedMedia channel is secretly also the SessionHandler.
# Let's make up a proxy and call some methods on it. They should fail
# gracefully, rather than crashing Gabble.
session_handler = make_channel_proxy(conn, path, 'Media.SessionHandler')
try:
session_handler.Ready()
except DBusException, e:
assertEquals(cs.NOT_AVAILABLE, e.get_dbus_name())
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:25,代码来源:misuse.py
示例4: join_muc
def join_muc(q, bus, conn, stream, muc, request=None):
"""
Joins 'muc', returning the muc's handle, a proxy object for the channel,
its path and its immutable properties just after the CreateChannel event
has fired. The room contains one other member.
"""
if request is None:
request = {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: muc,
}
muc_handle = request_muc_handle(q, conn, stream, muc)
requests = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
call_async(q, requests, 'CreateChannel',
dbus.Dictionary(request, signature='sv'))
q.expect('stream-presence', to='%s/test' % muc)
# Send presence for other member of room.
stream.send(make_muc_presence('owner', 'moderator', muc, 'bob'))
# Send presence for own membership of room.
stream.send(make_muc_presence('none', 'participant', muc, 'test'))
event = q.expect('dbus-return', method='CreateChannel')
path, props = event.value
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
['Messages'])
return (muc_handle, chan, path, props)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:33,代码来源:mucutil.py
示例5: test
def test(jp, q, bus, conn, stream):
if not jp.can_do_video_only():
return
remote_jid = '[email protected]/Foo'
jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
jt.prepare()
self_handle = conn.GetSelfHandle()
handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
chan_path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
cs.HT_CONTACT, handle, True)
chan = wrap_channel(bus.get_object(conn.bus_name, chan_path),
'StreamedMedia', ['MediaSignalling', 'Group', 'CallState', 'DTMF'])
chan_props = chan.Properties.GetAll(cs.CHANNEL)
assert cs.CHANNEL_IFACE_DTMF in chan_props['Interfaces'], \
chan_props['Interfaces']
chan.StreamedMedia.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_VIDEO])
# S-E gets notified about new session handler, and calls Ready on it
e = q.expect('dbus-signal', signal='NewSessionHandler')
assert e.args[1] == 'rtp'
session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
session_handler.Ready()
e = q.expect('dbus-signal', signal='NewStreamHandler')
video_path = e.args[0]
stream_handler = make_channel_proxy(conn, video_path, 'Media.StreamHandler')
stream_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
stream_handler.Ready(jt.get_video_codecs_dbus())
stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)
e = q.expect('stream-iq', predicate=jp.action_predicate('session-initiate'))
stream.send(make_result_iq(stream, e.stanza))
jt.parse_session_initiate(e.query)
jt.accept()
# Gabble tells s-e to start sending
q.expect('dbus-signal', signal='SetStreamSending', args=[True],
path=video_path)
# We don't actually have an audio stream, so this is a non-starter.
call_async(q, chan.DTMF, 'StartTone', 666, 3)
q.expect('dbus-error', method='StartTone', name=cs.NOT_AVAILABLE)
call_async(q, chan.DTMF, 'MultipleTones', '**666##')
q.expect('dbus-error', method='MultipleTones', name=cs.NOT_AVAILABLE)
# We can still stop all the tones that are playing (a no-op).
call_async(q, chan.DTMF, 'StopTone', 666)
q.expect('dbus-return', method='StopTone')
chan.Group.RemoveMembers([self_handle], 'closed')
e = q.expect('dbus-signal', signal='Closed', path=chan_path)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:59,代码来源:dtmf-no-audio.py
示例6: test
def test(q, bus, conn, stream):
muc_handle = request_muc_handle(q, conn, stream, '[email protected]')
call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TEXT, cs.HT_ROOM,
muc_handle, True)
q.expect('stream-presence', to='[email protected]/test')
# Send presence for own membership of room.
stream.send(
make_muc_presence('owner', 'moderator', '[email protected]', 'test'))
iq, ret = q.expect_many(
EventPattern('stream-iq', to='[email protected]', iq_type='get',
query_ns=ns.MUC_OWNER),
EventPattern('dbus-return', method='RequestChannel'))
handle_muc_get_iq(stream, iq.stanza)
text_chan = wrap_channel(
bus.get_object(conn.bus_name, ret.value[0]), 'Text')
props = dict([(name, id)
for id, name, sig, flags in text_chan.TpProperties.ListProperties()])
call_async(q, text_chan.TpProperties, 'SetProperties',
[(props['password'], 'foo'), (props['password-required'], True)])
event = q.expect('stream-iq', to='[email protected].localhost', iq_type='get',
query_ns=ns.MUC_OWNER)
handle_muc_get_iq(stream, event.stanza)
event = q.expect('stream-iq', to='[email protected]', iq_type='set',
query_ns=ns.MUC_OWNER)
fields = xpath.queryForNodes('/iq/query/x/field', event.stanza)
form = {}
for field in fields:
values = xpath.queryForNodes('/field/value', field)
form[field['var']] = [str(v) for v in values]
assert form == {'password': ['foo'], 'password_protected': ['1'],
'muc#roomconfig_presencebroadcast' :
['moderator', 'participant', 'visitor']}
acknowledge_iq(stream, event.stanza)
event = q.expect('dbus-signal', signal='PropertiesChanged')
assert event.args == [[(props['password'], 'foo'),
(props['password-required'], True)]]
q.expect('dbus-return', method='SetProperties', value=())
call_async(q, text_chan.TpProperties, 'SetProperties',
[(31337, 'foo'), (props['password-required'], True)])
q.expect('dbus-error', name=cs.INVALID_ARGUMENT)
call_async(q, text_chan.TpProperties, 'SetProperties',
[(props['password'], True), (props['password-required'], 'foo')])
q.expect('dbus-error', name=cs.NOT_AVAILABLE)
call_async(q, text_chan.TpProperties, 'SetProperties',
[(props['subject-contact'], 42)])
q.expect('dbus-error', name=cs.PERMISSION_DENIED)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:59,代码来源:test-muc-properties.py
示例7: join
def join(q, bus, conn):
call_async(q, conn.Requests, "CreateChannel", {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.ROOM_NAME: CHANNEL,
})
q.expect('stream-JOIN')
event = q.expect('dbus-return', method='CreateChannel')
path, props = event.value
return wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
['Destroyable', 'Messages'])
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:10,代码来源:muc-destroy.py
示例8: test
def test(q, bus, conn, stream):
def send_own_message(to, text):
iq = elem_iq(stream, 'set', from_='chat.facebook.com')(
elem(NS_FACEBOOK_MESSAGES, 'own-message', to=to, self='false')(
elem('body')(text)
)
)
stream.send(iq)
q.expect('stream-iq', iq_type='result', iq_id=iq['id'])
# First, test receiving an own-message stanza for a message sent to a
# contact we have an open channel for.
jid = '[email protected]'
_, path, props = conn.Requests.EnsureChannel({
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_ID: jid,
})
channel = wrap_channel(bus.get_object(conn.bus_name, path),
'Text', ['Messages'])
handle = props[cs.TARGET_HANDLE]
text = u'omg omg its ur birthdayy <3 <3 xoxoxoxo'
send_own_message(to=jid, text=text)
e = q.expect('dbus-signal', signal='MessageReceived')
message, = e.args
assertLength(1, message)
header = message[0]
assertEquals(handle, header['message-sender'])
assertEquals(cs.MT_DELIVERY_REPORT, header['message-type'])
assertEquals(cs.DELIVERY_STATUS_ACCEPTED, header['delivery-status'])
assertContains('delivery-echo', header)
echo = header['delivery-echo']
echo_header, echo_body = echo
assertEquals(conn.GetSelfHandle(), echo_header['message-sender'])
assertEquals('text/plain', echo_body['content-type'])
assertEquals(text, echo_body['content'])
channel.Text.AcknowledgePendingMessages([header['pending-message-id']])
channel.Close()
# Now test receiving an own-message stanza for a message sent to a contact
# we don't have a channel open for. It should be ignored (but acked). This
# is consistent with delivery failure reports.
q.forbid_events([EventPattern('dbus-signal', signal='MessageReceived')])
send_own_message(to='[email protected]',
text=u'please ignore this message')
sync_dbus(bus, q, conn)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:51,代码来源:facebook-own-message.py
示例9: test
def test(q, bus, conn, stream, access_control):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, iq_event.stanza)
# check if we can request muc D-Bus tube
t.check_conn_properties(q, conn)
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
self_name = conn.inspect_contact_sync(self_handle)
# offer a D-Bus tube to another room using new API
muc = '[email protected]'
request = {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: '[email protected]',
cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase',
}
join_muc(q, bus, conn, stream, muc, request=request)
e = q.expect('dbus-signal', signal='NewChannels')
channels = e.args[0]
assert len(channels) == 1
path, prop = channels[0]
assert prop[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
assert prop[cs.INITIATOR_ID] == '[email protected]/test'
assert prop[cs.REQUESTED] == True
assert prop[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
assert prop[cs.TARGET_ID] == '[email protected]'
assert prop[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
assert prop[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
cs.SOCKET_ACCESS_CONTROL_LOCALHOST]
# check that the tube channel is in the channels list
all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
assertContains((path, prop), all_channels)
tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'DBusTube')
tube_props = tube_chan.Properties.GetAll(cs.CHANNEL_IFACE_TUBE, byte_arrays=True)
assert tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
# try to offer using a wrong access control
try:
tube_chan.DBusTube.Offer(sample_parameters, cs.SOCKET_ACCESS_CONTROL_PORT)
except dbus.DBusException, e:
assertEquals(e.get_dbus_name(), cs.INVALID_ARGUMENT)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:51,代码来源:offer-muc-dbus-tube.py
示例10: stream_tube
def stream_tube(q, bus, conn, method, jid):
request_stream_tube(q, bus, conn, method, jid)
e, _ = q.expect_many(EventPattern('dbus-return', method=method),
EventPattern('dbus-signal', signal='NewChannels'))
# sigh
if method == 'EnsureChannel':
path = e.value[1]
else:
path = e.value[0]
tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamTube')
return (tube_chan,) + e.value
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:14,代码来源:only-text-muc-when-needed.py
示例11: text_channel
def text_channel(q, bus, conn, method, jid, presence=True):
request_text_channel(q, bus, conn, method, jid)
e, _ = q.expect_many(EventPattern('dbus-return', method=method),
EventPattern('dbus-signal', signal='NewChannels'))
# sigh
if method == 'EnsureChannel':
path = e.value[1]
else:
path = e.value[0]
text_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text')
return (text_chan,) + e.value
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:14,代码来源:only-text-muc-when-needed.py
示例12: test
def test(q, bus, conn, stream):
conn.Connect()
q.expect_many(
EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
EventPattern('irc-connected'))
e = q.expect('dbus-signal', signal='NewChannels')
channels = e.args[0]
path, props = channels[0]
channel = wrap_channel(bus.get_object(conn.bus_name, path),
cs.CHANNEL_TYPE_SERVER_TLS_CONNECTION)
channel.Close()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 2])
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:14,代码来源:connect-close-ssl.py
示例13: test
def test(q, bus, conn, stream):
conn.Connect()
event = q.expect('stream-iq', query_ns=ns.ROSTER)
# send back empty roster
event.stanza['type'] = 'result'
stream.send(event.stanza)
while True:
event = q.expect('dbus-signal', signal='NewChannel')
path, type, handle_type, handle, suppress_handler = event.args
if type != cs.CHANNEL_TYPE_CONTACT_LIST:
continue
chan_name = conn.InspectHandles(handle_type, [handle])[0]
if chan_name == 'subscribe':
break
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'ContactList')
assertLength(0, chan.Group.GetMembers())
# request subscription
handle = conn.RequestHandles(cs.HT_CONTACT, ['[email protected]'])[0]
chan.Group.AddMembers([handle], '')
event = q.expect('stream-iq', iq_type='set', query_ns=ns.ROSTER)
item = event.query.firstChildElement()
assertEquals('[email protected]com', item["jid"])
acknowledge_iq(stream, event.stanza)
event = q.expect('stream-presence', presence_type='subscribe')
presence = domish.Element(('jabber:client', 'presence'))
presence['from'] = '[email protected]'
presence['type'] = 'subscribed'
stream.send(presence)
q.expect_many(
EventPattern('dbus-signal', signal='MembersChanged',
args=['', [handle], [], [], [], 0, 0]),
EventPattern('stream-presence'),
)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:45,代码来源:test-roster-subscribe.py
示例14: join_muc
def join_muc(q, bus, conn, stream, muc, request=None,
also_capture=[], role='participant', affiliation='none'):
"""
Joins 'muc', returning a proxy object for the channel,
its path and its immutable properties just after the CreateChannel event
has fired. The room contains one other member.
"""
try_to_join_muc(q, bus, conn, stream, muc, request=request)
# Send presence for other member of room.
stream.send(make_muc_presence('owner', 'moderator', muc, 'bob'))
# Send presence for own membership of room.
stream.send(make_muc_presence(affiliation, role, muc, 'test'))
captured = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
*also_capture)
path, props = captured[0].value
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
['Subject.DRAFT'])
return (chan, path, props) + tuple(captured[1:])
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:24,代码来源:mucutil.py
示例15: test
def test(q, bus, conn, stream):
path = conn.Requests.CreateChannel(
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_ID: GUYBRUSH,
})[0]
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text')
# Let's start out with an empty roster, eh?
e = q.expect('stream-iq', iq_type='get', query_ns=ns.ROSTER)
e.stanza['type'] = 'result'
stream.send(e.stanza)
report_received_on_open_channel(q, bus, conn, stream, chan)
report_ignored_without_channel(q, bus, conn, stream)
not_sending_request_to_contact(q, bus, conn, stream, chan)
# FIXME: This test is disabled because of stupidity in the presence cache.
# See the comment in receipts_conceivably_supported().
#sending_request_to_presenceless_contact(q, bus, conn, stream, chan)
sending_request_to_cappy_contact(q, bus, conn, stream, chan)
replying_to_requests(q, bus, conn, stream)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:24,代码来源:receipts.py
示例16: incoming
def incoming(jp, q, bus, conn, stream):
remote_jid = '[email protected]/antennas'
jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
jt.prepare()
self_handle = conn.GetSelfHandle()
remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
for a, v in [("audio1", None), (None, "video1"), ("audio1", "video1")]:
if v!= None and not jp.can_do_video():
continue
if a == None and v != None and not jp.can_do_video_only():
continue
jt.incoming_call(audio=a, video=v)
e = q.expect('dbus-signal', signal='NewChannels',
predicate=lambda e:
cs.CHANNEL_TYPE_CONTACT_LIST not in e.args[0][0][1].values())
chans = e.args[0]
assertLength(1, chans)
path, props = chans[0]
assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, props[cs.CHANNEL_TYPE])
assertEquals(a != None, props[cs.INITIAL_AUDIO])
assertEquals(v != None, props[cs.INITIAL_VIDEO])
# FIXME: This doesn't check non-Google contacts that can only do one
# media type, as such contacts as simulated by JingleTest2 can always
# do both.
assertEquals(not jp.can_do_video() or not jp.can_do_video_only(),
props[cs.IMMUTABLE_STREAMS])
chan = wrap_channel(bus.get_object(conn.bus_name, path),
cs.CHANNEL_TYPE_STREAMED_MEDIA)
chan.Close()
开发者ID:jku,项目名称:telepathy-gabble,代码行数:36,代码来源:initial-audio-video.py
示例17: test
def test(q, bus, conn, stream):
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: '[email protected]'
})
gfc, _, _, _ = q.expect_many(
# Initial group flags
EventPattern('dbus-signal', signal='GroupFlagsChanged',
predicate=lambda e: e.args[0] != 0),
EventPattern('dbus-signal', signal='MembersChangedDetailed',
predicate=lambda e: e.args[3] == [2]),
# Removing CAN_ADD
EventPattern('dbus-signal', signal='GroupFlagsChanged',
args = [0, cs.GF_CAN_ADD], predicate=lambda e: e.args[0] == 0),
EventPattern('stream-presence', to='[email protected]/test'))
assert gfc.args[1] == 0
# Send presence for anonymous other member of room.
stream.send(make_muc_presence('owner', 'moderator', '[email protected]', 'bob'))
# Send presence for anonymous other member of room (2)
stream.send(make_muc_presence('owner', 'moderator', '[email protected]', 'brian'))
# Send presence for nonymous other member of room.
stream.send(make_muc_presence('none', 'participant', '[email protected]',
'che', '[email protected]'))
# Send presence for nonymous other member of room (2)
stream.send(make_muc_presence('none', 'participant', '[email protected]',
'chris', '[email protected]'))
# Send presence for own membership of room.
stream.send(make_muc_presence('none', 'participant', '[email protected]', 'test'))
# Since we received MUC presence that contains an owner JID, the
# OWNERS_NOT_AVAILABLE flag should be removed.
event = q.expect('dbus-signal', signal='GroupFlagsChanged',
args = [0, cs.GF_HANDLE_OWNERS_NOT_AVAILABLE ])
event = q.expect('dbus-signal', signal='HandleOwnersChanged')
owners = event.args[0]
event = q.expect('dbus-signal', signal='MembersChangedDetailed')
added = event.args[0]
[test, bob, brian, che, che_owner, chris, chris_owner] = \
conn.get_contact_handles_sync(
[ '[email protected]/test', '[email protected]/bob',
'[email protected]/brian', '[email protected]/che',
'[email protected]', '[email protected]/chris', '[email protected]',
])
expected_members = sorted([test, bob, brian, che, chris])
expected_owners = { test: self_handle,
bob: 0,
brian: 0,
che: che_owner,
chris: chris_owner
}
assertEquals(expected_members, sorted(added))
assertEquals(expected_owners, owners)
event = q.expect('dbus-return', method='CreateChannel')
chan = wrap_channel(bus.get_object(conn.bus_name, event.value[0]), 'Text')
# Exercise HandleOwners
owners = chan.Properties.Get(cs.CHANNEL_IFACE_GROUP, 'HandleOwners')
assertEquals(che_owner, owners[che])
assertEquals(chris_owner, owners[chris])
# Exercise D-Bus properties
all = chan.Properties.GetAll(cs.CHANNEL_IFACE_GROUP)
assert all[u'LocalPendingMembers'] == [], all
assert sorted(all[u'Members']) == expected_members, all
assert all[u'RemotePendingMembers'] == [], all
assert all[u'SelfHandle'] == test, all
assert all[u'HandleOwners'] == expected_owners, all
flags = all[u'GroupFlags']
assertFlagsSet(cs.GF_CHANNEL_SPECIFIC_HANDLES, flags)
assertFlagsUnset(cs.GF_HANDLE_OWNERS_NOT_AVAILABLE, flags)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:86,代码来源:test-muc-ownership.py
示例18: test
def test(q, bus, conn, stream):
room = '[email protected]'
handle = request_muc_handle(q, conn, stream, room)
call_async(q, conn.Requests, 'CreateChannel', {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_HANDLE: handle})
expected_muc_jid = '%s/%s' % (room, 'test')
q.expect('stream-presence', to=expected_muc_jid)
# tell gabble the room needs a password
denied = \
elem('jabber:client', 'presence', from_=expected_muc_jid,
type='error')(
elem(ns.MUC, 'x'),
elem('error', type='auth')(
elem(ns.STANZA, 'not-authorized'),
),
)
stream.send(denied)
cc, _, _ = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
EventPattern('dbus-signal', signal='PasswordFlagsChanged',
args=[cs.PASSWORD_FLAG_PROVIDE, 0]))
chan = wrap_channel(bus.get_object(conn.bus_name, cc.value[0]), 'Text',
['Password'])
flags = chan.Password.GetPasswordFlags()
assertEquals(cs.PASSWORD_FLAG_PROVIDE, flags)
call_async(q, chan.Password, 'ProvidePassword', 'brand new benz')
expect_attempt(q, expected_muc_jid, 'brand new benz')
# Try again while the first attempt is outstanding. Gabble should say no.
call_async(q, chan.Password, 'ProvidePassword', 'faster faster')
q.expect('dbus-error', method='ProvidePassword')
# Sorry, wrong password.
stream.send(denied)
ret = q.expect('dbus-return', method='ProvidePassword')
assert not ret.value[0]
call_async(q, chan.Password, 'ProvidePassword', 'bougie friends')
expect_attempt(q, expected_muc_jid, 'bougie friends')
# Well, this may be the right password, but actually that nick is in use.
presence = elem('presence', from_=expected_muc_jid, type='error')(
elem(ns.MUC, 'x'),
elem('error', type='cancel')(
elem(ns.STANZA, 'conflict'),
))
stream.send(presence)
# Okay, so Gabble tries again, with a new JID *and the same password*.
expected_muc_jid = expected_muc_jid + '_'
expect_attempt(q, expected_muc_jid, 'bougie friends')
# Hey this worked.
stream.send(make_muc_presence('none', 'participant', room, 'test_'))
ret, _ = q.expect_many(
EventPattern('dbus-return', method='ProvidePassword'),
EventPattern('dbus-signal', signal='PasswordFlagsChanged',
args=[0, cs.PASSWORD_FLAG_PROVIDE]))
assert ret.value[0]
开发者ID:jku,项目名称:telepathy-gabble,代码行数:69,代码来源:password.py
示例19: test
def test(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
self_handle = conn.GetSelfHandle()
jid = '[email protected]'
foo_handle = conn.RequestHandles(cs.HT_CONTACT, [jid])[0]
path = conn.Requests.CreateChannel(
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
cs.TARGET_HANDLE: foo_handle,
})[0]
chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
['ChatState', 'Destroyable'])
presence = make_presence('[email protected]/Foo', status='hello',
caps={
'node': 'http://telepathy.freedesktop.org/homeopathy',
'ver' : '0.1',
})
stream.send(presence)
version_event = q.expect('stream-iq', to='[email protected]/Foo',
query_ns='http://jabber.org/protocol/disco#info',
query_node='http://telepathy.freedesktop.org/homeopathy#0.1')
result = make_result_iq(stream, version_event.stanza)
query = result.firstChildElement()
feature = query.addElement('feature')
feature['var'] = 'http://jabber.org/protocol/chatstates'
stream.send(result)
sync_stream(q, stream)
# Receiving chat states:
# Composing...
m = domish.Element((None, 'message'))
m['from'] = '[email protected]/Foo'
m['type'] = 'chat'
m.addElement((ns.CHAT_STATES, 'composing'))
stream.send(m)
changed = q.expect('dbus-signal', signal='ChatStateChanged')
handle, state = changed.args
assertEquals(foo_handle, handle)
assertEquals(cs.CHAT_STATE_COMPOSING, state)
# Message!
m = domish.Element((None, 'message'))
m['from'] = '[email protected]/Foo'
m['type'] = 'chat'
m.addElement((ns.CHAT_STATES, 'active'))
m.addElement('body', content='hello')
stream.send(m)
changed = q.expect('dbus-signal', signal='ChatStateChanged')
handle, state = changed.args
assertEquals(foo_handle, handle)
assertEquals(cs.CHAT_STATE_ACTIVE, state)
# Sending chat states:
# Composing...
call_async(q, chan.ChatState, 'SetChatState', cs.CHAT_STATE_COMPOSING)
stream_message = q.expect('stream-message')
check_state_notification(stream_message.stanza, 'composing')
# XEP 0085:
# every content message SHOULD contain an <active/> notification.
call_async(q, chan.Text, 'Send', 0, 'hi.')
stream_message = q.expect('stream-message')
elem = stream_message.stanza
assert elem.name == 'message'
assert elem['type'] == 'chat', elem['type']
def is_body(e):
if e.name == 'body':
assert e.children[0] == u'hi.', e.toXml()
return True
return False
def is_active(e):
if e.uri == ns.CHAT_STATES:
assert e.name == 'active', e.toXml()
return True
return False
children = list(elem.elements())
assert len(filter(is_body, children)) == 1, elem.toXml()
assert len(filter(is_active, children)) == 1, elem.toXml()
# Close the channel without acking the received message. The peer should
#.........这里部分代码省略.........
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:101,代码来源:test-chat-state.py
示例20: test
def test(q, bus, conn, stream, send_early_description_info=False):
jp = JingleProtocol031()
jt2 = JingleTest2(jp, conn, q, stream, '[email protected]', '[email protected]/Foo')
jt2.prepare()
self_handle = conn.GetSelfHandle()
remote_handle = conn.RequestHandles(cs.HT_CONTACT, ["[email protected]/Foo"])[0]
# Remote end calls us
jt2.incoming_call()
# FIXME: these signals are not observable by real clients, since they
# happen before NewChannels.
# The caller is in members
e = q.expect('dbus-signal', signal='MembersChanged',
args=[u'', [remote_handle], [], [], [], 0, 0])
# We're pending because of remote_handle
e = q.expect('dbus-signal', signal='MembersChanged',
args=[u'', [], [], [self_handle], [], remote_handle,
cs.GC_REASON_INVITED])
chan = wrap_channel(bus.get_object(conn.bus_name, e.path),
'StreamedMedia')
# S-E gets notified about new session handler, and calls Ready on it
e = q.expect('dbus-signal', signal='NewSessionHandler')
assert e.args[1] == 'rtp'
if send_early_description_info:
"""
Regression test for a bug where Gabble would crash if you sent it
description-info before calling Ready() on the relevant StreamHandler,
and then for a bug where Gabble would never accept the call if a
description-info was received before all StreamHandlers were Ready().
"""
node = jp.SetIq(jt2.peer, jt2.jid, [
jp.Jingle(jt2.sid, jt2.peer, 'description-info', [
jp.Content('stream1', 'initiator', 'both',
jp.Description('audio', [ ])) ]) ])
stream.send(jp.xml(node))
sync_stream(q, stream)
session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
session_handler.Ready()
chan.Group.AddMembers([self_handle], 'accepted')
# S-E gets notified about a newly-created stream
e = q.expect('dbus-signal', signal='NewStreamHandler')
id1 = e.args[1]
stream_handler = make_channel_proxy(conn, e.args[0], 'Media.StreamHandler')
# We are now in members too
e = q.expect('dbus-signal', signal='MembersChanged',
args=[u'', [self_handle], [], [], [], self_handle,
cs.GC_REASON_NONE])
# we are now both in members
members = chan.Group.GetMembers()
assert set(members) == set([self_handle, remote_handle]), members
local_codecs = [('GSM', 3, 8000, {}),
('PCMA', 8, 8000, {'helix':'woo yay'}),
('PCMU', 0, 8000, {}) ]
local_codecs_dbus = jt2.dbusify_codecs_with_params(local_codecs)
stream_handler.NewNativeCandidate("fake", jt2.get_remote_transports_dbus())
stream_handler.Ready(local_codecs_dbus)
stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)
stream_handler.CodecsUpdated(local_codecs_dbus)
local_codecs = [('GSM', 3, 8000, {}),
('PCMA', 8, 8000, {'gstreamer':'rock on'}),
('PCMU', 0, 8000, {}) ]
local_codecs_dbus = jt2.dbusify_codecs_with_params(local_codecs)
stream_handler.CodecsUpdated(local_codecs_dbus)
# First IQ is transport-info; also, we expect to be told what codecs the
# other end wants.
e, src = q.expect_many(
EventPattern('stream-iq',
predicate=jp.action_predicate('transport-info')),
EventPattern('dbus-signal', signal='SetRemoteCodecs')
)
assertEquals('[email protected]/Foo', e.query['initiator'])
assert jt2.audio_codecs == [ (name, id, rate, parameters)
for id, name, type, rate, channels, parameters in unwrap(src.args[0]) ], \
(jt2.audio_codecs, unwrap(src.args[0]))
stream.send(jp.xml(jp.ResultIq('[email protected]', e.stanza, [])))
# S-E reports codec intersection, after which gabble can send acceptance
stream_handler.SupportedCodecs(local_codecs_dbus)
#.........这里部分代码省略.........
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:101,代码来源:test-description-info.py
注:本文中的servicetest.wrap_channel函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论