• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python servicetest.wrap_channel函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中servicetest.wrap_channel函数的典型用法代码示例。如果您正苦于以下问题:Python wrap_channel函数的具体用法?Python wrap_channel怎么用?Python wrap_channel使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wrap_channel函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: incoming

def incoming(jp, q, bus, conn, stream):
    remote_jid = '[email protected]/antennas'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
    jt.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]

    for a, v in [(True, False), (False, True), (True, True)]:
        if v and not jp.can_do_video():
            continue
        if not a and v and not jp.can_do_video_only():
            continue

        jt.incoming_call(audio=a, video=v)
        e = q.expect('dbus-signal', signal='NewChannels')
        chans = e.args[0]
        assertLength(1, chans)

        path, props = chans[0]

        assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, props[cs.CHANNEL_TYPE])
        assertEquals(a, props[cs.INITIAL_AUDIO])
        assertEquals(v, props[cs.INITIAL_VIDEO])

        chan = wrap_channel(bus.get_object(conn.bus_name, path),
            cs.CHANNEL_TYPE_STREAMED_MEDIA)
        chan.Close()
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:28,代码来源:initial-audio-video.py


示例2: check_neither

def check_neither(q, conn, bus, stream, remote_handle):
    """
    Make a channel without specifying InitialAudio or InitialVideo; check
    that it's announced with both False, and that they're both present and
    false in GetAll().
    """

    path, props = conn.Requests.CreateChannel({
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
        cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
        cs.TARGET_HANDLE: remote_handle})

    assertContains((cs.INITIAL_AUDIO, False), props.items())
    assertContains((cs.INITIAL_VIDEO, False), props.items())

    chan = wrap_channel(bus.get_object(conn.bus_name, path),
        cs.CHANNEL_TYPE_STREAMED_MEDIA, ['MediaSignalling'])
    props = chan.Properties.GetAll(cs.CHANNEL_TYPE_STREAMED_MEDIA)
    assertContains(('InitialAudio', False), props.items())
    assertContains(('InitialVideo', False), props.items())

    # We shouldn't have started a session yet, so there shouldn't be any
    # session handlers. Strictly speaking, there could be a session handler
    # with no stream handlers, but...
    session_handlers = chan.MediaSignalling.GetSessionHandlers()
    assertLength(0, session_handlers)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:26,代码来源:initial-audio-video.py


示例3: test

def test(q, bus, conn, stream):
    jp = JingleProtocol031()
    remote_jid = '[email protected]/misc'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)

    jt.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
    path, _ = conn.Requests.CreateChannel({
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
        cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
        cs.TARGET_HANDLE: remote_handle})

    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')

    # In Gabble, the StreamedMedia channel is secretly also the SessionHandler.
    # Let's make up a proxy and call some methods on it. They should fail
    # gracefully, rather than crashing Gabble.
    session_handler = make_channel_proxy(conn, path, 'Media.SessionHandler')

    try:
        session_handler.Ready()
    except DBusException, e:
        assertEquals(cs.NOT_AVAILABLE, e.get_dbus_name())
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:25,代码来源:misuse.py


示例4: join_muc

def join_muc(q, bus, conn, stream, muc, request=None):
    """
    Joins 'muc', returning the muc's handle, a proxy object for the channel,
    its path and its immutable properties just after the CreateChannel event
    has fired. The room contains one other member.
    """
    if request is None:
        request = {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
            cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
            cs.TARGET_ID: muc,
        }

    muc_handle = request_muc_handle(q, conn, stream, muc)

    requests = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
    call_async(q, requests, 'CreateChannel',
        dbus.Dictionary(request, signature='sv'))

    q.expect('stream-presence', to='%s/test' % muc)

    # Send presence for other member of room.
    stream.send(make_muc_presence('owner', 'moderator', muc, 'bob'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence('none', 'participant', muc, 'test'))

    event = q.expect('dbus-return', method='CreateChannel')
    path, props = event.value
    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
        ['Messages'])

    return (muc_handle, chan, path, props)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:33,代码来源:mucutil.py


示例5: test

def test(jp, q, bus, conn, stream):
    if not jp.can_do_video_only():
        return

    remote_jid = '[email protected]/Foo'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
    jt.prepare()

    self_handle = conn.GetSelfHandle()
    handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]

    chan_path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
            cs.HT_CONTACT, handle, True)
    chan = wrap_channel(bus.get_object(conn.bus_name, chan_path),
            'StreamedMedia', ['MediaSignalling', 'Group', 'CallState', 'DTMF'])
    chan_props = chan.Properties.GetAll(cs.CHANNEL)
    assert cs.CHANNEL_IFACE_DTMF in chan_props['Interfaces'], \
        chan_props['Interfaces']

    chan.StreamedMedia.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_VIDEO])

    # S-E gets notified about new session handler, and calls Ready on it
    e = q.expect('dbus-signal', signal='NewSessionHandler')
    assert e.args[1] == 'rtp'

    session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
    session_handler.Ready()

    e = q.expect('dbus-signal', signal='NewStreamHandler')
    video_path = e.args[0]
    stream_handler = make_channel_proxy(conn, video_path, 'Media.StreamHandler')

    stream_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
    stream_handler.Ready(jt.get_video_codecs_dbus())
    stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

    e = q.expect('stream-iq', predicate=jp.action_predicate('session-initiate'))
    stream.send(make_result_iq(stream, e.stanza))

    jt.parse_session_initiate(e.query)

    jt.accept()

    # Gabble tells s-e to start sending
    q.expect('dbus-signal', signal='SetStreamSending', args=[True],
            path=video_path)

    # We don't actually have an audio stream, so this is a non-starter.
    call_async(q, chan.DTMF, 'StartTone', 666, 3)
    q.expect('dbus-error', method='StartTone', name=cs.NOT_AVAILABLE)
    call_async(q, chan.DTMF, 'MultipleTones', '**666##')
    q.expect('dbus-error', method='MultipleTones', name=cs.NOT_AVAILABLE)

    # We can still stop all the tones that are playing (a no-op).
    call_async(q, chan.DTMF, 'StopTone', 666)
    q.expect('dbus-return', method='StopTone')

    chan.Group.RemoveMembers([self_handle], 'closed')
    e = q.expect('dbus-signal', signal='Closed', path=chan_path)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:59,代码来源:dtmf-no-audio.py


示例6: test

def test(q, bus, conn, stream):
    muc_handle = request_muc_handle(q, conn, stream, '[email protected]')

    call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TEXT, cs.HT_ROOM,
        muc_handle, True)

    q.expect('stream-presence', to='[email protected]/test')

    # Send presence for own membership of room.
    stream.send(
        make_muc_presence('owner', 'moderator', '[email protected]', 'test'))

    iq, ret = q.expect_many(
        EventPattern('stream-iq', to='[email protected]', iq_type='get',
            query_ns=ns.MUC_OWNER),
        EventPattern('dbus-return', method='RequestChannel'))
    handle_muc_get_iq(stream, iq.stanza)

    text_chan = wrap_channel(
        bus.get_object(conn.bus_name, ret.value[0]), 'Text')

    props = dict([(name, id)
        for id, name, sig, flags in text_chan.TpProperties.ListProperties()])
    call_async(q, text_chan.TpProperties, 'SetProperties',
        [(props['password'], 'foo'), (props['password-required'], True)])

    event = q.expect('stream-iq', to='[email protected].localhost', iq_type='get',
        query_ns=ns.MUC_OWNER)
    handle_muc_get_iq(stream, event.stanza)

    event = q.expect('stream-iq', to='[email protected]', iq_type='set',
        query_ns=ns.MUC_OWNER)
    fields = xpath.queryForNodes('/iq/query/x/field', event.stanza)
    form = {}
    for field in fields:
        values = xpath.queryForNodes('/field/value', field)
        form[field['var']] = [str(v) for v in values]
    assert form == {'password': ['foo'], 'password_protected': ['1'],
            'muc#roomconfig_presencebroadcast' :
            ['moderator', 'participant', 'visitor']}
    acknowledge_iq(stream, event.stanza)

    event = q.expect('dbus-signal', signal='PropertiesChanged')
    assert event.args == [[(props['password'], 'foo'),
        (props['password-required'], True)]]

    q.expect('dbus-return', method='SetProperties', value=())

    call_async(q, text_chan.TpProperties, 'SetProperties',
        [(31337, 'foo'), (props['password-required'], True)])
    q.expect('dbus-error', name=cs.INVALID_ARGUMENT)

    call_async(q, text_chan.TpProperties, 'SetProperties',
        [(props['password'], True), (props['password-required'], 'foo')])
    q.expect('dbus-error', name=cs.NOT_AVAILABLE)

    call_async(q, text_chan.TpProperties, 'SetProperties',
        [(props['subject-contact'], 42)])
    q.expect('dbus-error', name=cs.PERMISSION_DENIED)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:59,代码来源:test-muc-properties.py


示例7: join

def join(q, bus, conn):
    call_async(q, conn.Requests, "CreateChannel", {
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
        cs.ROOM_NAME: CHANNEL,
    })
    q.expect('stream-JOIN')
    event = q.expect('dbus-return', method='CreateChannel')
    path, props = event.value
    return wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
        ['Destroyable', 'Messages'])
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:10,代码来源:muc-destroy.py


示例8: test

def test(q, bus, conn, stream):
    def send_own_message(to, text):
        iq = elem_iq(stream, 'set', from_='chat.facebook.com')(
              elem(NS_FACEBOOK_MESSAGES, 'own-message', to=to, self='false')(
                elem('body')(text)
              )
            )
        stream.send(iq)
        q.expect('stream-iq', iq_type='result', iq_id=iq['id'])

    # First, test receiving an own-message stanza for a message sent to a
    # contact we have an open channel for.
    jid = '[email protected]'
    _, path, props = conn.Requests.EnsureChannel({
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
        cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
        cs.TARGET_ID: jid,
    })
    channel = wrap_channel(bus.get_object(conn.bus_name, path),
        'Text', ['Messages'])
    handle = props[cs.TARGET_HANDLE]

    text = u'omg omg its ur birthdayy <3 <3 xoxoxoxo'
    send_own_message(to=jid, text=text)
    e = q.expect('dbus-signal', signal='MessageReceived')
    message, = e.args
    assertLength(1, message)
    header = message[0]

    assertEquals(handle, header['message-sender'])
    assertEquals(cs.MT_DELIVERY_REPORT, header['message-type'])
    assertEquals(cs.DELIVERY_STATUS_ACCEPTED, header['delivery-status'])

    assertContains('delivery-echo', header)
    echo = header['delivery-echo']
    echo_header, echo_body = echo

    assertEquals(conn.GetSelfHandle(), echo_header['message-sender'])
    assertEquals('text/plain', echo_body['content-type'])
    assertEquals(text, echo_body['content'])

    channel.Text.AcknowledgePendingMessages([header['pending-message-id']])
    channel.Close()

    # Now test receiving an own-message stanza for a message sent to a contact
    # we don't have a channel open for. It should be ignored (but acked). This
    # is consistent with delivery failure reports.
    q.forbid_events([EventPattern('dbus-signal', signal='MessageReceived')])
    send_own_message(to='[email protected]',
        text=u'please ignore this message')
    sync_dbus(bus, q, conn)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:51,代码来源:facebook-own-message.py


示例9: test

def test(q, bus, conn, stream, access_control):
    iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
            query_name='vCard')

    acknowledge_iq(stream, iq_event.stanza)

    # check if we can request muc D-Bus tube
    t.check_conn_properties(q, conn)

    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    self_name = conn.inspect_contact_sync(self_handle)

    # offer a D-Bus tube to another room using new API
    muc = '[email protected]'
    request = {
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
        cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
        cs.TARGET_ID: '[email protected]',
        cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase',
    }
    join_muc(q, bus, conn, stream, muc, request=request)

    e = q.expect('dbus-signal', signal='NewChannels')

    channels = e.args[0]
    assert len(channels) == 1
    path, prop = channels[0]
    assert prop[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
    assert prop[cs.INITIATOR_ID] == '[email protected]/test'
    assert prop[cs.REQUESTED] == True
    assert prop[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
    assert prop[cs.TARGET_ID] == '[email protected]'
    assert prop[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
    assert prop[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
            cs.SOCKET_ACCESS_CONTROL_LOCALHOST]

    # check that the tube channel is in the channels list
    all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
        dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
    assertContains((path, prop), all_channels)

    tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'DBusTube')
    tube_props = tube_chan.Properties.GetAll(cs.CHANNEL_IFACE_TUBE, byte_arrays=True)

    assert tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED

    # try to offer using a wrong access control
    try:
        tube_chan.DBusTube.Offer(sample_parameters, cs.SOCKET_ACCESS_CONTROL_PORT)
    except dbus.DBusException, e:
        assertEquals(e.get_dbus_name(), cs.INVALID_ARGUMENT)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:51,代码来源:offer-muc-dbus-tube.py


示例10: stream_tube

def stream_tube(q, bus, conn, method, jid):
    request_stream_tube(q, bus, conn, method, jid)
    e, _ = q.expect_many(EventPattern('dbus-return', method=method),
                         EventPattern('dbus-signal', signal='NewChannels'))

    # sigh
    if method == 'EnsureChannel':
        path = e.value[1]
    else:
        path = e.value[0]

    tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamTube')

    return (tube_chan,) + e.value
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:14,代码来源:only-text-muc-when-needed.py


示例11: text_channel

def text_channel(q, bus, conn, method, jid, presence=True):
    request_text_channel(q, bus, conn, method, jid)
    e, _ = q.expect_many(EventPattern('dbus-return', method=method),
                         EventPattern('dbus-signal', signal='NewChannels'))

    # sigh
    if method == 'EnsureChannel':
        path = e.value[1]
    else:
        path = e.value[0]

    text_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text')

    return (text_chan,) + e.value
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:14,代码来源:only-text-muc-when-needed.py


示例12: test

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect_many(
            EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
            EventPattern('irc-connected'))
    e = q.expect('dbus-signal', signal='NewChannels')
    channels = e.args[0]
    path, props = channels[0]

    channel = wrap_channel(bus.get_object(conn.bus_name, path),
        cs.CHANNEL_TYPE_SERVER_TLS_CONNECTION)
    channel.Close()

    q.expect('dbus-signal', signal='StatusChanged', args=[2, 2])
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:14,代码来源:connect-close-ssl.py


示例13: test

def test(q, bus, conn, stream):
    conn.Connect()

    event = q.expect('stream-iq', query_ns=ns.ROSTER)
    # send back empty roster
    event.stanza['type'] = 'result'
    stream.send(event.stanza)

    while True:
        event = q.expect('dbus-signal', signal='NewChannel')
        path, type, handle_type, handle, suppress_handler = event.args

        if type != cs.CHANNEL_TYPE_CONTACT_LIST:
            continue

        chan_name = conn.InspectHandles(handle_type, [handle])[0]

        if chan_name == 'subscribe':
            break

    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'ContactList')
    assertLength(0, chan.Group.GetMembers())

    # request subscription
    handle = conn.RequestHandles(cs.HT_CONTACT, ['[email protected]'])[0]
    chan.Group.AddMembers([handle], '')

    event = q.expect('stream-iq', iq_type='set', query_ns=ns.ROSTER)
    item = event.query.firstChildElement()
    assertEquals('[email protected]com', item["jid"])

    acknowledge_iq(stream, event.stanza)

    event = q.expect('stream-presence', presence_type='subscribe')

    presence = domish.Element(('jabber:client', 'presence'))
    presence['from'] = '[email protected]'
    presence['type'] = 'subscribed'
    stream.send(presence)

    q.expect_many(
            EventPattern('dbus-signal', signal='MembersChanged',
                args=['', [handle], [], [], [], 0, 0]),
            EventPattern('stream-presence'),
            )
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:45,代码来源:test-roster-subscribe.py


示例14: join_muc

def join_muc(q, bus, conn, stream, muc, request=None,
        also_capture=[], role='participant', affiliation='none'):
    """
    Joins 'muc', returning a proxy object for the channel,
    its path and its immutable properties just after the CreateChannel event
    has fired. The room contains one other member.
    """
    try_to_join_muc(q, bus, conn, stream, muc, request=request)

    # Send presence for other member of room.
    stream.send(make_muc_presence('owner', 'moderator', muc, 'bob'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence(affiliation, role, muc, 'test'))

    captured = q.expect_many(
            EventPattern('dbus-return', method='CreateChannel'),
            *also_capture)
    path, props = captured[0].value
    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
        ['Subject.DRAFT'])


    return (chan, path, props) + tuple(captured[1:])
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:24,代码来源:mucutil.py


示例15: test

def test(q, bus, conn, stream):
    path = conn.Requests.CreateChannel(
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
              cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
              cs.TARGET_ID: GUYBRUSH,
              })[0]
    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text')

    # Let's start out with an empty roster, eh?
    e = q.expect('stream-iq', iq_type='get', query_ns=ns.ROSTER)
    e.stanza['type'] = 'result'
    stream.send(e.stanza)

    report_received_on_open_channel(q, bus, conn, stream, chan)
    report_ignored_without_channel(q, bus, conn, stream)
    not_sending_request_to_contact(q, bus, conn, stream, chan)

    # FIXME: This test is disabled because of stupidity in the presence cache.
    # See the comment in receipts_conceivably_supported().
    #sending_request_to_presenceless_contact(q, bus, conn, stream, chan)

    sending_request_to_cappy_contact(q, bus, conn, stream, chan)

    replying_to_requests(q, bus, conn, stream)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:24,代码来源:receipts.py


示例16: incoming

def incoming(jp, q, bus, conn, stream):
    remote_jid = '[email protected]/antennas'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
    jt.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]

    for a, v in [("audio1", None), (None, "video1"), ("audio1", "video1")]:
        if v!= None and not jp.can_do_video():
            continue
        if a == None and v != None and not jp.can_do_video_only():
            continue

        jt.incoming_call(audio=a, video=v)
        e = q.expect('dbus-signal', signal='NewChannels',
            predicate=lambda e:
                cs.CHANNEL_TYPE_CONTACT_LIST not in e.args[0][0][1].values())
        chans = e.args[0]
        assertLength(1, chans)

        path, props = chans[0]

        assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, props[cs.CHANNEL_TYPE])
        assertEquals(a != None, props[cs.INITIAL_AUDIO])
        assertEquals(v != None, props[cs.INITIAL_VIDEO])

        # FIXME: This doesn't check non-Google contacts that can only do one
        # media type, as such contacts as simulated by JingleTest2 can always
        # do both.
        assertEquals(not jp.can_do_video() or not jp.can_do_video_only(),
            props[cs.IMMUTABLE_STREAMS])

        chan = wrap_channel(bus.get_object(conn.bus_name, path),
            cs.CHANNEL_TYPE_STREAMED_MEDIA)
        chan.Close()
开发者ID:jku,项目名称:telepathy-gabble,代码行数:36,代码来源:initial-audio-video.py


示例17: test

def test(q, bus, conn, stream):
    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")

    call_async(q, conn.Requests, 'CreateChannel',
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
              cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
              cs.TARGET_ID: '[email protected]'
            })

    gfc, _, _, _ = q.expect_many(
        # Initial group flags
        EventPattern('dbus-signal', signal='GroupFlagsChanged',
            predicate=lambda e: e.args[0] != 0),
        EventPattern('dbus-signal', signal='MembersChangedDetailed',
            predicate=lambda e: e.args[3] == [2]),
        # Removing CAN_ADD
        EventPattern('dbus-signal', signal='GroupFlagsChanged',
          args = [0, cs.GF_CAN_ADD], predicate=lambda e: e.args[0] == 0),
        EventPattern('stream-presence', to='[email protected]/test'))
    assert gfc.args[1] == 0

    # Send presence for anonymous other member of room.
    stream.send(make_muc_presence('owner', 'moderator', '[email protected]', 'bob'))

    # Send presence for anonymous other member of room (2)
    stream.send(make_muc_presence('owner', 'moderator', '[email protected]', 'brian'))

    # Send presence for nonymous other member of room.
    stream.send(make_muc_presence('none', 'participant', '[email protected]',
        'che', '[email protected]'))

    # Send presence for nonymous other member of room (2)
    stream.send(make_muc_presence('none', 'participant', '[email protected]',
        'chris', '[email protected]'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence('none', 'participant', '[email protected]', 'test'))

    # Since we received MUC presence that contains an owner JID, the
    # OWNERS_NOT_AVAILABLE flag should be removed.
    event = q.expect('dbus-signal', signal='GroupFlagsChanged',
        args = [0, cs.GF_HANDLE_OWNERS_NOT_AVAILABLE ])

    event = q.expect('dbus-signal', signal='HandleOwnersChanged')
    owners = event.args[0]

    event = q.expect('dbus-signal', signal='MembersChangedDetailed')
    added = event.args[0]

    [test, bob, brian, che, che_owner, chris, chris_owner] = \
        conn.get_contact_handles_sync(
            [ '[email protected]/test', '[email protected]/bob',
              '[email protected]/brian', '[email protected]/che',
              '[email protected]', '[email protected]/chris', '[email protected]',
            ])
    expected_members = sorted([test, bob, brian, che, chris])
    expected_owners = { test: self_handle,
                        bob: 0,
                        brian: 0,
                        che: che_owner,
                        chris: chris_owner
                      }
    assertEquals(expected_members, sorted(added))
    assertEquals(expected_owners, owners)

    event = q.expect('dbus-return', method='CreateChannel')

    chan = wrap_channel(bus.get_object(conn.bus_name, event.value[0]), 'Text')

    # Exercise HandleOwners
    owners = chan.Properties.Get(cs.CHANNEL_IFACE_GROUP, 'HandleOwners')
    assertEquals(che_owner, owners[che])
    assertEquals(chris_owner, owners[chris])

    # Exercise D-Bus properties
    all = chan.Properties.GetAll(cs.CHANNEL_IFACE_GROUP)

    assert all[u'LocalPendingMembers'] == [], all
    assert sorted(all[u'Members']) == expected_members, all
    assert all[u'RemotePendingMembers'] == [], all
    assert all[u'SelfHandle'] == test, all
    assert all[u'HandleOwners'] == expected_owners, all

    flags = all[u'GroupFlags']
    assertFlagsSet(cs.GF_CHANNEL_SPECIFIC_HANDLES, flags)
    assertFlagsUnset(cs.GF_HANDLE_OWNERS_NOT_AVAILABLE, flags)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:86,代码来源:test-muc-ownership.py


示例18: test

def test(q, bus, conn, stream):
    room = '[email protected]'
    handle = request_muc_handle(q, conn, stream, room)

    call_async(q, conn.Requests, 'CreateChannel', {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
            cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
            cs.TARGET_HANDLE: handle})

    expected_muc_jid = '%s/%s' % (room, 'test')
    q.expect('stream-presence', to=expected_muc_jid)

    # tell gabble the room needs a password
    denied = \
        elem('jabber:client', 'presence', from_=expected_muc_jid,
            type='error')(
          elem(ns.MUC, 'x'),
          elem('error', type='auth')(
            elem(ns.STANZA, 'not-authorized'),
          ),
        )
    stream.send(denied)

    cc, _, _ = q.expect_many(
        EventPattern('dbus-return', method='CreateChannel'),
        EventPattern('dbus-signal', signal='NewChannels'),
        EventPattern('dbus-signal', signal='PasswordFlagsChanged',
            args=[cs.PASSWORD_FLAG_PROVIDE, 0]))

    chan = wrap_channel(bus.get_object(conn.bus_name, cc.value[0]), 'Text',
        ['Password'])

    flags = chan.Password.GetPasswordFlags()
    assertEquals(cs.PASSWORD_FLAG_PROVIDE, flags)

    call_async(q, chan.Password, 'ProvidePassword', 'brand new benz')
    expect_attempt(q, expected_muc_jid, 'brand new benz')

    # Try again while the first attempt is outstanding. Gabble should say no.
    call_async(q, chan.Password, 'ProvidePassword', 'faster faster')
    q.expect('dbus-error', method='ProvidePassword')

    # Sorry, wrong password.
    stream.send(denied)
    ret = q.expect('dbus-return', method='ProvidePassword')
    assert not ret.value[0]

    call_async(q, chan.Password, 'ProvidePassword', 'bougie friends')
    expect_attempt(q, expected_muc_jid, 'bougie friends')

    # Well, this may be the right password, but actually that nick is in use.
    presence = elem('presence', from_=expected_muc_jid, type='error')(
        elem(ns.MUC, 'x'),
        elem('error', type='cancel')(
          elem(ns.STANZA, 'conflict'),
        ))
    stream.send(presence)

    # Okay, so Gabble tries again, with a new JID *and the same password*.
    expected_muc_jid = expected_muc_jid + '_'
    expect_attempt(q, expected_muc_jid, 'bougie friends')

    # Hey this worked.
    stream.send(make_muc_presence('none', 'participant', room, 'test_'))
    ret, _ = q.expect_many(
        EventPattern('dbus-return', method='ProvidePassword'),
        EventPattern('dbus-signal', signal='PasswordFlagsChanged',
            args=[0, cs.PASSWORD_FLAG_PROVIDE]))
    assert ret.value[0]
开发者ID:jku,项目名称:telepathy-gabble,代码行数:69,代码来源:password.py


示例19: test

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    self_handle = conn.GetSelfHandle()

    jid = '[email protected]'
    foo_handle = conn.RequestHandles(cs.HT_CONTACT, [jid])[0]

    path = conn.Requests.CreateChannel(
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
              cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
              cs.TARGET_HANDLE: foo_handle,
              })[0]
    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'Text',
        ['ChatState', 'Destroyable'])

    presence = make_presence('[email protected]/Foo', status='hello',
        caps={
            'node': 'http://telepathy.freedesktop.org/homeopathy',
            'ver' : '0.1',
        })
    stream.send(presence)

    version_event = q.expect('stream-iq', to='[email protected]/Foo',
        query_ns='http://jabber.org/protocol/disco#info',
        query_node='http://telepathy.freedesktop.org/homeopathy#0.1')

    result = make_result_iq(stream, version_event.stanza)
    query = result.firstChildElement()
    feature = query.addElement('feature')
    feature['var'] = 'http://jabber.org/protocol/chatstates'
    stream.send(result)

    sync_stream(q, stream)

    # Receiving chat states:

    # Composing...
    m = domish.Element((None, 'message'))
    m['from'] = '[email protected]/Foo'
    m['type'] = 'chat'
    m.addElement((ns.CHAT_STATES, 'composing'))
    stream.send(m)

    changed = q.expect('dbus-signal', signal='ChatStateChanged')
    handle, state = changed.args
    assertEquals(foo_handle, handle)
    assertEquals(cs.CHAT_STATE_COMPOSING, state)

    # Message!

    m = domish.Element((None, 'message'))
    m['from'] = '[email protected]/Foo'
    m['type'] = 'chat'
    m.addElement((ns.CHAT_STATES, 'active'))
    m.addElement('body', content='hello')
    stream.send(m)

    changed = q.expect('dbus-signal', signal='ChatStateChanged')
    handle, state = changed.args
    assertEquals(foo_handle, handle)
    assertEquals(cs.CHAT_STATE_ACTIVE, state)

    # Sending chat states:

    # Composing...
    call_async(q, chan.ChatState, 'SetChatState', cs.CHAT_STATE_COMPOSING)

    stream_message = q.expect('stream-message')
    check_state_notification(stream_message.stanza, 'composing')

    # XEP 0085:
    #   every content message SHOULD contain an <active/> notification.
    call_async(q, chan.Text, 'Send', 0, 'hi.')

    stream_message = q.expect('stream-message')
    elem = stream_message.stanza
    assert elem.name == 'message'
    assert elem['type'] == 'chat', elem['type']

    def is_body(e):
        if e.name == 'body':
            assert e.children[0] == u'hi.', e.toXml()
            return True
        return False

    def is_active(e):
        if e.uri == ns.CHAT_STATES:
            assert e.name == 'active', e.toXml()
            return True
        return False

    children = list(elem.elements())

    assert len(filter(is_body,   children)) == 1, elem.toXml()
    assert len(filter(is_active, children)) == 1, elem.toXml()

    # Close the channel without acking the received message. The peer should
#.........这里部分代码省略.........
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:101,代码来源:test-chat-state.py


示例20: test

def test(q, bus, conn, stream, send_early_description_info=False):
    jp = JingleProtocol031()
    jt2 = JingleTest2(jp, conn, q, stream, '[email protected]', '[email protected]/Foo')
    jt2.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, ["[email protected]/Foo"])[0]

    # Remote end calls us
    jt2.incoming_call()

    # FIXME: these signals are not observable by real clients, since they
    #        happen before NewChannels.
    # The caller is in members
    e = q.expect('dbus-signal', signal='MembersChanged',
             args=[u'', [remote_handle], [], [], [], 0, 0])

    # We're pending because of remote_handle
    e = q.expect('dbus-signal', signal='MembersChanged',
             args=[u'', [], [], [self_handle], [], remote_handle,
                   cs.GC_REASON_INVITED])

    chan = wrap_channel(bus.get_object(conn.bus_name,  e.path),
        'StreamedMedia')

    # S-E gets notified about new session handler, and calls Ready on it
    e = q.expect('dbus-signal', signal='NewSessionHandler')
    assert e.args[1] == 'rtp'

    if send_early_description_info:
        """
        Regression test for a bug where Gabble would crash if you sent it
        description-info before calling Ready() on the relevant StreamHandler,
        and then for a bug where Gabble would never accept the call if a
        description-info was received before all StreamHandlers were Ready().
        """
        node = jp.SetIq(jt2.peer, jt2.jid, [
            jp.Jingle(jt2.sid, jt2.peer, 'description-info', [
                jp.Content('stream1', 'initiator', 'both',
                    jp.Description('audio', [ ])) ]) ])
        stream.send(jp.xml(node))

        sync_stream(q, stream)

    session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
    session_handler.Ready()

    chan.Group.AddMembers([self_handle], 'accepted')

    # S-E gets notified about a newly-created stream
    e = q.expect('dbus-signal', signal='NewStreamHandler')
    id1 = e.args[1]

    stream_handler = make_channel_proxy(conn, e.args[0], 'Media.StreamHandler')

    # We are now in members too
    e = q.expect('dbus-signal', signal='MembersChanged',
             args=[u'', [self_handle], [], [], [], self_handle,
                   cs.GC_REASON_NONE])

    # we are now both in members
    members = chan.Group.GetMembers()
    assert set(members) == set([self_handle, remote_handle]), members

    local_codecs = [('GSM', 3, 8000, {}),
                    ('PCMA', 8, 8000, {'helix':'woo yay'}),
                    ('PCMU', 0, 8000, {}) ]
    local_codecs_dbus = jt2.dbusify_codecs_with_params(local_codecs)

    stream_handler.NewNativeCandidate("fake", jt2.get_remote_transports_dbus())
    stream_handler.Ready(local_codecs_dbus)
    stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

    stream_handler.CodecsUpdated(local_codecs_dbus)

    local_codecs = [('GSM', 3, 8000, {}),
                    ('PCMA', 8, 8000, {'gstreamer':'rock on'}),
                    ('PCMU', 0, 8000, {}) ]
    local_codecs_dbus = jt2.dbusify_codecs_with_params(local_codecs)
    stream_handler.CodecsUpdated(local_codecs_dbus)


    # First IQ is transport-info; also, we expect to be told what codecs the
    # other end wants.
    e, src = q.expect_many(
        EventPattern('stream-iq',
            predicate=jp.action_predicate('transport-info')),
        EventPattern('dbus-signal', signal='SetRemoteCodecs')
        )
    assertEquals('[email protected]/Foo', e.query['initiator'])

    assert jt2.audio_codecs == [ (name, id, rate, parameters)
        for id, name, type, rate, channels, parameters in unwrap(src.args[0]) ], \
        (jt2.audio_codecs, unwrap(src.args[0]))

    stream.send(jp.xml(jp.ResultIq('[email protected]', e.stanza, [])))

    # S-E reports codec intersection, after which gabble can send acceptance
    stream_handler.SupportedCodecs(local_codecs_dbus)

#.........这里部分代码省略.........
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:101,代码来源:test-description-info.py



注:本文中的servicetest.wrap_channel函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python command_base.call函数代码示例发布时间:2022-05-27
下一篇:
Python servicetest.sync_dbus函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap