• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python servicetest.make_channel_proxy函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中servicetest.make_channel_proxy函数的典型用法代码示例。如果您正苦于以下问题:Python make_channel_proxy函数的具体用法?Python make_channel_proxy怎么用?Python make_channel_proxy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了make_channel_proxy函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _test

def _test(jp, q, bus, conn, stream,
          jingle_reason, group_change_reason, stream_error):
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', '[email protected]/Foo')
    jt.prepare()
    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, ["[email protected]/Foo"])[0]

    # Ring ring!
    jt.incoming_call()
    new_channel, new_session_handler = q.expect_many(
        EventPattern('dbus-signal', signal='NewChannel',
            predicate=lambda e: cs.CHANNEL_TYPE_CONTACT_LIST not in e.args),
        EventPattern('dbus-signal', signal='NewSessionHandler'))
    assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, new_channel.args[1])
    assertEquals(cs.HT_CONTACT, new_channel.args[2])
    assertEquals(remote_handle, new_channel.args[3])
    assertEquals('rtp', new_session_handler.args[1])

    channel_path = new_channel.args[0]

    # Client calls Ready on new session handler.
    session_handler = make_channel_proxy(
        conn, new_session_handler.args[0], 'Media.SessionHandler')
    session_handler.Ready()

    # Client gets notified about a newly created stream...
    new_stream_handler = q.expect('dbus-signal', signal='NewStreamHandler')
    stream_id = new_stream_handler.args[1]
    stream_handler = make_channel_proxy(
        conn, new_stream_handler.args[0], 'Media.StreamHandler')
    stream_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
    stream_handler.Ready(jt.dbusify_codecs([("FOO", 5, 8000, {})]))

    q.expect('dbus-signal', signal='SetRemoteCodecs')

    msg = u"o noes"

    # ...but something goes wrong.
    stream_handler.Error(stream_error, msg)

    q.expect("stream-iq", iq_type="set",
             predicate=lambda x: _session_terminate_predicate(x, jingle_reason,
                                                              msg, jp))
    # Bye bye members.
    mc = q.expect('dbus-signal', signal='MembersChanged',
                  interface=cs.CHANNEL_IFACE_GROUP, path=channel_path,
                  args=[msg, [], [self_handle, remote_handle], [],
                        [], self_handle, group_change_reason])

    q.expect('dbus-signal', signal='StreamError',
             interface=cs.CHANNEL_TYPE_STREAMED_MEDIA,
             args=[stream_id, stream_error, msg])

    # Bye bye stream
    q.expect('dbus-signal', signal='Close')
    q.expect('dbus-signal', signal='StreamRemoved')

    # Bye bye channel.
    q.expect('dbus-signal', signal='Closed')
    q.expect('dbus-signal', signal='ChannelClosed')
开发者ID:jku,项目名称:telepathy-gabble,代码行数:60,代码来源:incoming-call-stream-error.py


示例2: make_call

    def make_call(expected_recipient):
        jp = JingleProtocol031()
        jt = JingleTest2(jp, conn, q, stream, '[email protected]', 'dummy')

        conn.Requests.CreateChannel({
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
            cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
            cs.TARGET_ID: cat,
            cs.INITIAL_AUDIO: True,
        })

        e = q.expect('dbus-signal', signal='NewSessionHandler')
        session = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
        session.Ready()

        e = q.expect('dbus-signal', signal='NewStreamHandler')

        stream_handler = make_channel_proxy(conn, e.args[0],
            'Media.StreamHandler')
        stream_handler.NewNativeCandidate("fake",
            jt.get_remote_transports_dbus())
        stream_handler.Ready(jt.get_audio_codecs_dbus())
        stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

        e = q.expect('stream-iq',
            predicate=jp.action_predicate('session-initiate'))
        assertEquals(expected_recipient, e.to)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:27,代码来源:jingle-caps.py


示例3: test

def test(jp, q, bus, conn, stream):
    if not jp.can_do_video_only():
        return

    remote_jid = '[email protected]/Foo'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)
    jt.prepare()

    self_handle = conn.GetSelfHandle()
    handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]

    chan_path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
            cs.HT_CONTACT, handle, True)
    chan = wrap_channel(bus.get_object(conn.bus_name, chan_path),
            'StreamedMedia', ['MediaSignalling', 'Group', 'CallState', 'DTMF'])
    chan_props = chan.Properties.GetAll(cs.CHANNEL)
    assert cs.CHANNEL_IFACE_DTMF in chan_props['Interfaces'], \
        chan_props['Interfaces']

    chan.StreamedMedia.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_VIDEO])

    # S-E gets notified about new session handler, and calls Ready on it
    e = q.expect('dbus-signal', signal='NewSessionHandler')
    assert e.args[1] == 'rtp'

    session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
    session_handler.Ready()

    e = q.expect('dbus-signal', signal='NewStreamHandler')
    video_path = e.args[0]
    stream_handler = make_channel_proxy(conn, video_path, 'Media.StreamHandler')

    stream_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
    stream_handler.Ready(jt.get_video_codecs_dbus())
    stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

    e = q.expect('stream-iq', predicate=jp.action_predicate('session-initiate'))
    stream.send(make_result_iq(stream, e.stanza))

    jt.parse_session_initiate(e.query)

    jt.accept()

    # Gabble tells s-e to start sending
    q.expect('dbus-signal', signal='SetStreamSending', args=[True],
            path=video_path)

    # We don't actually have an audio stream, so this is a non-starter.
    call_async(q, chan.DTMF, 'StartTone', 666, 3)
    q.expect('dbus-error', method='StartTone', name=cs.NOT_AVAILABLE)
    call_async(q, chan.DTMF, 'MultipleTones', '**666##')
    q.expect('dbus-error', method='MultipleTones', name=cs.NOT_AVAILABLE)

    # We can still stop all the tones that are playing (a no-op).
    call_async(q, chan.DTMF, 'StopTone', 666)
    q.expect('dbus-return', method='StopTone')

    chan.Group.RemoveMembers([self_handle], 'closed')
    e = q.expect('dbus-signal', signal='Closed', path=chan_path)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:59,代码来源:dtmf-no-audio.py


示例4: _test

def _test(jp, q, bus, conn, stream,
          jingle_reason, group_change_reason, stream_error):
    remote_jid = '[email protected]/Foo'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)

    jt.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]

    path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
        cs.HT_CONTACT, remote_handle, True)

    signalling_iface = make_channel_proxy(conn, path, 'Channel.Interface.MediaSignalling')
    media_iface = make_channel_proxy(conn, path, 'Channel.Type.StreamedMedia')

    media_iface.RequestStreams(remote_handle, [cs.MEDIA_STREAM_TYPE_AUDIO])

    # S-E gets notified about new session handler, and calls Ready on it
    e = q.expect('dbus-signal', signal='NewSessionHandler')
    assert e.args[1] == 'rtp'

    session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')
    session_handler.Ready()


    e = q.expect('dbus-signal', signal='NewStreamHandler')

    stream_handler = make_channel_proxy(conn, e.args[0], 'Media.StreamHandler')

    stream_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
    stream_handler.Ready(jt.get_audio_codecs_dbus())
    stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

    e = q.expect('stream-iq', predicate=jp.action_predicate('session-initiate'))
    stream.send(make_result_iq(stream, e.stanza))

    text = u"begone!"

    jt.parse_session_initiate(e.query)
    jt.terminate(reason=jingle_reason, text=text)

    mc = q.expect('dbus-signal', signal='MembersChanged')
    message, added, removed, lp, rp, actor, reason = mc.args
    assert added == [], added
    assert set(removed) == set([self_handle, remote_handle]), \
        (removed, self_handle, remote_handle)
    assert lp == [], lp
    assert rp == [], rp
    assert actor == remote_handle, (actor, remote_handle)
    if jp.is_modern_jingle():
        assertEquals(text, message)
        assertEquals(group_change_reason, reason)

    if jp.is_modern_jingle() and stream_error:
        se = q.expect('dbus-signal', signal='StreamError')
        assertEquals(stream_error, se.args[1])

    q.expect('dbus-signal', signal='Close') #XXX - match against the path
开发者ID:jku,项目名称:telepathy-gabble,代码行数:59,代码来源:test-outgoing-call-rejected.py


示例5: test

def test(q, bus, conn, stream):
    jp = JingleProtocol031()
    remote_jid = '[email protected]/misc'
    jt = JingleTest2(jp, conn, q, stream, '[email protected]', remote_jid)

    jt.prepare()

    self_handle = conn.GetSelfHandle()
    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [remote_jid])[0]
    path, _ = conn.Requests.CreateChannel({
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
        cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
        cs.TARGET_HANDLE: remote_handle})

    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')

    # In Gabble, the StreamedMedia channel is secretly also the SessionHandler.
    # Let's make up a proxy and call some methods on it. They should fail
    # gracefully, rather than crashing Gabble.
    session_handler = make_channel_proxy(conn, path, 'Media.SessionHandler')

    try:
        session_handler.Ready()
    except DBusException, e:
        assertEquals(cs.NOT_AVAILABLE, e.get_dbus_name())
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:25,代码来源:misuse.py


示例6: _local_content_add

def _local_content_add(jp, q, bus, conn, stream, initiate_call_func):
    jt, chan = initiate_call_func(jp, q, bus, conn, stream)

    remote_handle = conn.RequestHandles(cs.HT_CONTACT, [jt.peer])[0]

    chan.RequestStreams(remote_handle, [cs.MEDIA_STREAM_TYPE_VIDEO])

    nsh = q.expect('dbus-signal', signal='NewStreamHandler')
    stream_handler_path, stream_id, media_type, direction = nsh.args
    video_handler = make_channel_proxy(conn, stream_handler_path,
                                       'Media.StreamHandler')

    video_handler.NewNativeCandidate("fake", jt.get_remote_transports_dbus())
    video_handler.Ready(jt.get_audio_codecs_dbus())
    video_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)

    e = q.expect('stream-iq', predicate=jp.action_predicate('content-add'))
    c = e.query.firstChildElement()
    stream.send(make_result_iq(stream, e.stanza))

    node = jp.SetIq(jt.peer, jt.jid, [
            jp.Jingle(jt.sid, jt.peer, 'content-reject', [
                    ('reason', None, {}, [
                            ('failed-application', None, {}, [])]),
                    jp.Content(c['name'], c['creator'], c['senders']) ]) ])
    stream.send(jp.xml(node))

    q.expect('dbus-signal', signal='StreamError',
             args=[stream_id,
                   cs.MEDIA_STREAM_ERROR_CODEC_NEGOTIATION_FAILED,
                   ""]),
开发者ID:jku,项目名称:telepathy-gabble,代码行数:31,代码来源:stream-errors-on-content-reject.py


示例7: test

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect_many(
            EventPattern('dbus-signal', signal='StatusChanged', args=[1, 1]),
            EventPattern('irc-connected'))
    q.expect('dbus-signal', signal='SelfHandleChanged',
        args=[1L])
    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])

    CHANNEL_NAME = "#idletest"

    self_handle = conn.Get(CONN, 'SelfHandle', dbus_interface=PROPERTIES_IFACE)

    # The bouncer initiates a JOIN.
    path = test_join_bouncer(q, conn, stream, CHANNEL_NAME)

    # We PART.
    chan = make_channel_proxy(conn, path, 'Channel')
    chan.RemoveMembers([self_handle], "bye bye cruel world",
        dbus_interface=CHANNEL_IFACE_GROUP)
    q.expect('dbus-signal', signal='MembersChanged')

    # The bouncer initiates a JOIN to force the issue.
    test_join_bouncer(q, conn, stream, CHANNEL_NAME)

    call_async(q, conn, 'Disconnect')
    q.expect_many(
            EventPattern('dbus-return', method='Disconnect'),
            EventPattern('dbus-signal', signal='StatusChanged', args=[2, 1]))
    return True
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:30,代码来源:join-muc-channel-bouncer.py


示例8: accept_file

    def accept_file(self):
        # decline FT
        self. channel.Close()

        e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
        state, reason = e.args
        assert state == cs.FT_STATE_CANCELLED
        assert reason == cs.FT_STATE_CHANGE_REASON_LOCAL_STOPPED
        self.q.expect('dbus-signal', signal='Closed')

        # Re send offer (this is a regression test as Salut used to crash at this
        # point)
        self.send_ft_offer_iq()

        e = self.q.expect('dbus-signal', signal='NewChannels')
        channels = e.args[0]
        assert len(channels) == 1
        path, props = channels[0]

        channel = make_channel_proxy(self.conn, path, 'Channel')

        # decline FT
        channel.Close()

        e = self.q.expect('dbus-signal', signal='FileTransferStateChanged')
        state, reason = e.args
        assert state == cs.FT_STATE_CANCELLED
        assert reason == cs.FT_STATE_CHANGE_REASON_LOCAL_STOPPED
        self.q.expect('dbus-signal', signal='Closed')

        # stop test
        return True
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:32,代码来源:receive-file-decline.py


示例9: invite_to_muc

def invite_to_muc(q, group1, conn2, invited_handle, inviter_handle):
    # first connection: invite contact
    group1.AddMembers([invited_handle], "Let's tube!")

    # channel is created on conn2
    e = q.expect('dbus-signal', signal='NewChannel', path=conn2.object_path)
    path = e.args[0]
    group2 = make_channel_proxy(conn2, path, "Channel.Interface.Group")

    # we are invited to the muc
    # added as local pending
    conn2_self_handle = conn2.Properties.Get(cs.CONN, "SelfHandle")
    q.expect('dbus-signal', signal='MembersChanged', path=path,
        args=["Let's tube!", [], [], [conn2_self_handle], [],
            inviter_handle, 4])

    # second connection: accept the invite
    group2.AddMembers([conn2_self_handle], "")

    # added as remote pending
    q.expect('dbus-signal', signal='MembersChanged', path=path,
        args=['', [], [], [], [conn2_self_handle], conn2_self_handle, 0])

    # added as member
    q.expect('dbus-signal', signal='MembersChanged', path=path,
        args=['', [conn2_self_handle], [], [], [], conn2_self_handle, 0])

    return group2
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:28,代码来源:tubetestutil.py


示例10: test

def test(q, bus, conn, stream, call_error_on):
    jt = jingletest.JingleTest(stream, '[email protected]', '[email protected]/Foo')

    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    remote_handle = conn.RequestHandles(1, ["[email protected]/Foo"])[0]

    # Remote end calls us
    jt.incoming_call()

    # FIXME: these signals are not observable by real clients, since they
    #        happen before NewChannels.
    # The caller is in members
    e = q.expect('dbus-signal', signal='MembersChanged',
             args=[u'', [remote_handle], [], [], [], 0, 0])

    # We're pending because of remote_handle
    e = q.expect('dbus-signal', signal='MembersChanged',
             args=[u'', [], [], [1L], [], remote_handle, cs.GC_REASON_INVITED])

    media_chan_suffix = e.path

    e = q.expect('dbus-signal', signal='NewSessionHandler')
    session_handler = make_channel_proxy(conn, e.args[0], 'Media.SessionHandler')

    if call_error_on == 'session':
        session_handler.Error(0, "this has been deprecated for years")
    else:
        session_handler.Ready()

        e = q.expect('dbus-signal', signal='NewStreamHandler')

        # S-E gets notified about a newly-created stream
        stream_handler = make_channel_proxy(conn, e.args[0], 'Media.StreamHandler')

        # Something goes wrong immediately!
        stream_handler.Error(0, "i'll have the eggs tostada please")

    # Gabble doesn't fall over, and the channel closes nicely.
    e = q.expect('dbus-signal', signal='Closed', path=media_chan_suffix)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:43,代码来源:stream-handler-error.py


示例11: make_stream_request

    def make_stream_request(stream_type):
        media_iface.RequestStreams(remote_handle, [stream_type])

        e = q.expect("dbus-signal", signal="NewStreamHandler")
        stream_id = e.args[1]

        stream_handler = make_channel_proxy(conn, e.args[0], "Media.StreamHandler")

        stream_handler.NewNativeCandidate("fake", jt2.get_remote_transports_dbus())
        stream_handler.Ready(jt2.get_audio_codecs_dbus())
        stream_handler.StreamState(cs.MEDIA_STREAM_STATE_CONNECTED)
        return (stream_handler, stream_id)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:12,代码来源:test-content-complex.py


示例12: run_test

def run_test(q, bus, conn, stream, jt, request_before_presence):
    """
    Requests streams on a media channel to jt.remote_jid, either before their
    presence is received (if request_before_presence is True) or after their
    presence is received but before we've got a disco response for their
    capabilities (otherwise).
    """

    # We intentionally DON'T set remote presence yet. Since Gabble is still
    # unsure whether to treat contact as offline for this purpose, it
    # will tentatively allow channel creation and contact handle addition

    request = dbus.Dictionary(
        {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAMED_MEDIA,
            cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
            cs.TARGET_ID: jt.remote_jid,
        },
        signature="sv",
    )
    path, props = conn.CreateChannel(request, dbus_interface=cs.CONN_IFACE_REQUESTS)
    media_iface = make_channel_proxy(conn, path, "Channel.Type.StreamedMedia")
    handle = props[cs.TARGET_HANDLE]

    sync_dbus(bus, q, conn)

    def call_request_streams():
        call_async(q, media_iface, "RequestStreams", handle, [cs.MEDIA_STREAM_TYPE_AUDIO])

    def send_presence():
        jt.send_remote_presence()
        return q.expect("stream-iq", query_ns=ns.DISCO_INFO, to=jt.remote_jid)

    if request_before_presence:
        # Request streams before either <presence> or caps have arrived. Gabble
        # should wait for both to arrive before returning from RequestStreams.
        call_request_streams()

        # Ensure Gabble's received the method call.
        sync_dbus(bus, q, conn)

        # Now send the presence.
        info_event = send_presence()
    else:
        info_event = send_presence()

        # Now call RequestStreams; it should wait for the disco reply.
        call_request_streams()

    jt.send_remote_disco_reply(info_event.stanza)

    # RequestStreams should now happily complete
    q.expect("dbus-return", method="RequestStreams")
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:53,代码来源:test-wait-for-caps.py


示例13: join_muc

def join_muc(q, conn, muc_name):
    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    muc_handle = conn.RequestHandles(cs.HT_ROOM, [muc_name])[0]
    path = conn.RequestChannel(cs.CHANNEL_TYPE_TEXT, cs.HT_ROOM, muc_handle, True)
    # added as remote pending
    q.expect('dbus-signal', signal='MembersChanged', path=path,
        args=['', [], [], [], [self_handle], self_handle, 0])
    # added as member
    q.expect('dbus-signal', signal='MembersChanged', path=path,
        args=['', [self_handle], [], [], [], self_handle, 0])
    group = make_channel_proxy(conn, path, "Channel.Interface.Group")

    return muc_handle, group
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-salut,代码行数:13,代码来源:tubetestutil.py


示例14: do_one_search

def do_one_search(q, bus, conn, stream, fields, expected_search_keys,
                  terms, results):

    call_create(q, conn, server)

    ret, nc_sig = answer_extended_field_query(q, stream, server, fields)

    path, props = ret.value
    props = unwrap(props)

    assert props[cs.CONTACT_SEARCH_SERVER] == server, pformat(props)
    assert sorted(props[cs.CONTACT_SEARCH_ASK]) == expected_search_keys, \
        sorted(props[cs.CONTACT_SEARCH_ASK])
    assert cs.CONTACT_SEARCH_STATE not in props, pformat(props)

    c = make_channel_proxy(conn, path, 'Channel')
    c_props = dbus.Interface(c, cs.PROPERTIES_IFACE)
    c_search = dbus.Interface(c, cs.CHANNEL_TYPE_CONTACT_SEARCH)

    state = c_props.Get(cs.CHANNEL_TYPE_CONTACT_SEARCH, 'SearchState')
    assert state == cs.SEARCH_NOT_STARTED, state

    # We make a search.
    iq = make_search(q, c_search, c_props, server, terms)
    query = iq.firstChildElement()
    fields_sent = xpath.queryForNodes(
        '/iq/query[@xmlns="%s"]/x[@xmlns="%s"][@type="submit"]/field'
        % (ns.SEARCH, ns.X_DATA), iq)
    assert fields_sent is not None

    # check FORM_TYPE
    f = fields_sent[0]
    assert f['type'] == 'hidden'
    assert f['var'] == 'FORM_TYPE'
    value = f.firstChildElement()
    assert value.name == 'value'
    assert value.children[0] == ns.SEARCH

    # extract search fields
    search_fields = []
    for f in fields_sent[1:]:
        value = f.firstChildElement()
        assert value.name == 'value'
        search_fields.append((f['var'], value.children[0]))

    # Server sends the results of the search.
    send_results_extended(stream, iq, results, fields)

    return search_fields, c, c_search, c_props
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:49,代码来源:extended.py


示例15: test

def test(q, bus, conn, stream, channel_type):
    jt = jingletest.JingleTest(stream, '[email protected]', '[email protected]/Foo')

    # We intentionally DON'T set remote presence yet. Since Gabble is still
    # unsure whether to treat contact as offline for this purpose, it
    # will tentatively allow channel creation and contact handle addition

    handle = conn.RequestHandles(cs.HT_CONTACT, [jt.remote_jid])[0]

    if channel_type == cs.CHANNEL_TYPE_STREAMED_MEDIA:
        path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA,
            cs.HT_CONTACT, handle, True)
        media_iface = make_channel_proxy(conn, path,
            'Channel.Type.StreamedMedia')

    # So it turns out that the calls to RequestStreams and Disconnect could be
    # reordered while the first waits on the result of introspecting the
    # channel's object which is kicked off by making a proxy object for it,
    # whereas the connection proxy is long ago introspected. Isn't dbus-python
    # great? Syncing here forces that introspection to finish so we can rely on
    # the ordering of RequestStreams and Disconnect. Yay.
    sync_dbus(bus, q, conn)

    # Now we request streams before either <presence> or caps have arrived
    if channel_type == cs.CHANNEL_TYPE_STREAMED_MEDIA:
        call_async(q, media_iface, 'RequestStreams', handle,
            [cs.MEDIA_STREAM_TYPE_AUDIO])

        before_events, after_events = disconnect_conn(q, conn, stream,
            [EventPattern('dbus-error', method='RequestStreams')])

        # RequestStreams should now return NotAvailable
        assert before_events[0].error.get_dbus_name() == cs.NOT_AVAILABLE, \
            before_events[0].error
    else:
        call_async(q, conn.Requests, 'CreateChannel',
            { cs.CHANNEL_TYPE: channel_type,
              cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
              cs.TARGET_ID: jt.remote_jid,
              cs.CALL_INITIAL_AUDIO: True
            })

        before_events, after_events = disconnect_conn(q, conn, stream,
            [EventPattern('dbus-error', method='CreateChannel')])

        # CreateChannel should now return Disconnected
        assert before_events[0].error.get_dbus_name() == cs.DISCONNECTED, \
            before_events[0].error
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:48,代码来源:test-wait-for-caps-incomplete.py


示例16: cancelled_while_in_progress

def cancelled_while_in_progress(q, bus, conn, stream, server):
    call_create(q, conn, server)

    ret, _ = answer_field_query(q, stream, server)
    path, props = ret.value

    c = make_channel_proxy(conn, path, 'Channel')
    c_props = dbus.Interface(c, cs.PROPERTIES_IFACE)
    c_search = dbus.Interface(c, cs.CHANNEL_TYPE_CONTACT_SEARCH)

    iq = make_search(q, c_search, c_props, server, { 'x-n-family': 'Threepwood' })

    # Before the server sends back the results, the client cancels the search.
    call_async(q, c_search, 'Stop')
    ret, ssc = q.expect_many(
        EventPattern('dbus-return', method='Stop'),
        EventPattern('dbus-signal', signal='SearchStateChanged'),
        )

    assert ssc.args[0] == cs.SEARCH_FAILED, ssc.args
    assert ssc.args[1] == cs.CANCELLED, ssc.args

    state = c_props.Get(cs.CHANNEL_TYPE_CONTACT_SEARCH, 'SearchState')
    assert state == cs.SEARCH_FAILED, (state, cs.SEARCH_FAILED)

    # Now the server sends us the results; SearchResultReceived shouldn't fire
    search_result_received_event = EventPattern('dbus-signal', signal='SearchResultReceived')
    q.forbid_events([search_result_received_event])

    send_results(stream, iq, results.values())

    # Make sure Gabble's received the results.
    sync_stream(q, stream)

    # Hooray! We survived. Now let's call Stop again; it should succeed but do
    # nothing.
    search_state_changed_event = EventPattern('dbus-signal', signal='SearchStateChanged')
    q.forbid_events([search_state_changed_event])

    call_async(q, c_search, 'Stop')
    ssc = q.expect('dbus-return', method='Stop')

    c.Close()
    q.unforbid_events([search_result_received_event, search_state_changed_event])
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:44,代码来源:unextended.py


示例17: test

def test(q, bus, conn, stream):
    jt = jingletest.JingleTest(stream, '[email protected]', '[email protected]/Foo')

    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])

    q.expect('stream-authenticated')
    q.expect('dbus-signal', signal='PresenceUpdate',
        args=[{1L: (0L, {u'available': {}})}])
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    # We intentionally DON'T set remote presence yet. Since Gabble is still
    # unsure whether to treat contact as offline for this purpose, it
    # will tentatively allow channel creation and contact handle addition

    handle = conn.RequestHandles(cs.HT_CONTACT, [jt.remote_jid])[0]

    path = conn.RequestChannel(cs.CHANNEL_TYPE_STREAMED_MEDIA, cs.HT_CONTACT,
        handle, True)
    media_iface = make_channel_proxy(conn, path, 'Channel.Type.StreamedMedia')

    # So it turns out that the calls to RequestStreams and Disconnect could be
    # reordered while the first waits on the result of introspecting the
    # channel's object which is kicked off by making a proxy object for it,
    # whereas the connection proxy is long ago introspected. Isn't dbus-python
    # great? Syncing here forces that introspection to finish so we can rely on
    # the ordering of RequestStreams and Disconnect. Yay.
    sync_dbus(bus, q, conn)

    # Now we request streams before either <presence> or caps have arrived
    call_async(q, media_iface, 'RequestStreams', handle,
        [cs.MEDIA_STREAM_TYPE_AUDIO])

    event = disconnect_conn(q, conn, stream,
        [EventPattern('dbus-error', method='RequestStreams')])[0]

    # RequestStreams should now return NotAvailable
    assert event.error.get_dbus_name() == cs.NOT_AVAILABLE, event.error
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:41,代码来源:test-wait-for-caps-incomplete.py


示例18: no_x_in_reply

def no_x_in_reply(q, bus, conn, stream):
    fields = [('nickname', 'text-single', 'NickName', []),
        ('nick', 'text-single', 'Nick', []),]
    terms = { 'nickname': 'Badger' }

    call_create(q, conn, server)
    ret, nc_sig = answer_extended_field_query(q, stream, server, fields)

    path, _ = ret.value
    c = make_channel_proxy(conn, path, 'Channel')
    c_props = dbus.Interface(c, cs.PROPERTIES_IFACE)
    c_search = dbus.Interface(c, cs.CHANNEL_TYPE_CONTACT_SEARCH)

    iq = make_search(q, c_search, c_props, server, terms)

    # The server sends back an IQ with a <query/> but no <x/> inside the query.
    acknowledge_iq(stream, iq)

    # Gabble should tell us the query failed, and not crash.
    event = q.expect('dbus-signal', signal='SearchStateChanged')
    state = event.args[0]
    assertEquals(cs.SEARCH_FAILED, state)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:22,代码来源:extended.py


示例19: _remote_content_add

def _remote_content_add(jp, q, bus, conn, stream, initiate_call_func):
    jt, chan = initiate_call_func(jp, q, bus, conn, stream)

    video_codecs = [
        jp.PayloadType(name, str(rate), str(id), parameters) \
            for (name, id, rate, parameters) in jt.video_codecs]

    node = jp.SetIq(jt.peer, jt.jid, [
            jp.Jingle(jt.sid, jt.peer, 'content-add', [
                    jp.Content(
                        'videostream', 'initiator', 'both',
                        jp.Description('video', video_codecs),
                        jp.TransportGoogleP2P()) ]) ])
    stream.send(jp.xml(node))

    _, nsh = q.expect_many(
        EventPattern('dbus-signal', signal='StreamAdded'),
        EventPattern('dbus-signal', signal='NewStreamHandler'))

    stream_handler_path, stream_id, media_type, direction = nsh.args

    video_handler = make_channel_proxy(conn, stream_handler_path,
                                       'Media.StreamHandler')

    video_handler.NewNativeCandidate("fake",
                                     jt.get_remote_transports_dbus())
    video_handler.Ready(jt.dbusify_codecs([("FOO", 5, 8000, {})]))

    msg = u"None of the codecs are good for us, damn!"

    video_handler.Error(cs.MEDIA_STREAM_ERROR_CODEC_NEGOTIATION_FAILED, msg)

    q.expect_many(
        EventPattern('dbus-signal', signal='StreamError',
                     args=[stream_id,
                           cs.MEDIA_STREAM_ERROR_CODEC_NEGOTIATION_FAILED,
                           msg]),
        EventPattern('stream-iq', predicate=_content_reject_predicate))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:38,代码来源:stream-errors-on-content-reject.py


示例20: returns_error_from_search

def returns_error_from_search(q, stream, conn):
    server = 'nofunforyou.localhost'
    iq = call_create(q, conn, server)

    result = make_result_iq(stream, iq)
    query = result.firstChildElement()
    query.addElement("first")
    stream.send(result)

    event = q.expect('dbus-return', method='CreateChannel')
    c = make_channel_proxy(conn, event.value[0], 'Channel')
    c_search = dbus.Interface(c, cs.CHANNEL_TYPE_CONTACT_SEARCH)

    call_async(q, c_search, 'Search', {'x-n-given': 'World of Goo'})
    iq_event, _ = q.expect_many(
        EventPattern('stream-iq', to=server, query_ns=ns.SEARCH),
        EventPattern('dbus-signal', signal='SearchStateChanged'),
        )

    iq = iq_event.stanza
    error = domish.Element((None, 'error'))
    error['type'] = 'modify'
    error.addElement((ns.STANZA, 'not-acceptable'))
    error.addElement((ns.STANZA, 'text'), content="We don't believe in games here.")
    send_error_reply(stream, iq, error)

    ssc = q.expect('dbus-signal', signal='SearchStateChanged')
    new_state, reason, details = ssc.args

    assert new_state == cs.SEARCH_FAILED, new_state
    assert reason == cs.PERMISSION_DENIED, reason

    # We call stop after the search has failed; it should succeed and do nothing.
    call_async(q, c_search, 'Stop')
    event = q.expect('dbus-return', method='Stop')

    c.Close()
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:37,代码来源:ceci-nest-pas-un-serveur.py



注:本文中的servicetest.make_channel_proxy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python servicetest.sync_dbus函数代码示例发布时间:2022-05-27
下一篇:
Python servicetest.call_async函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap