• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python servicetest.call_async函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中servicetest.call_async函数的典型用法代码示例。如果您正苦于以下问题:Python call_async函数的具体用法?Python call_async怎么用?Python call_async使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了call_async函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_deny_unblock_remove

def test_deny_unblock_remove(q, bus, conn, stream, stored, deny):
    """
    Test unblocking a contact, and, while that request is pending, deleting
    them.
    """
    self_handle = conn.GetSelfHandle()

    # This contact was on our roster, blocked and subscribed, when we started.
    contact = '[email protected]'
    handle = conn.RequestHandles(cs.HT_CONTACT, [contact])[0]

    # They're blocked, and we have a bidi subscription, so they should be on
    # deny and stored. (We already checked this earlier, but we've been messing
    # with the roster so let's be sure the preconditions are okay...)
    assertContains(handle,
        deny.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))
    assertContains(handle,
        stored.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))

    # Unblock them.
    call_async(q, deny.Group, 'RemoveMembers', [handle], "")

    roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
    item = roster_event.query.firstChildElement()
    assertEquals(contact, item['jid'])
    assertDoesNotContain((ns.GOOGLE_ROSTER, 't'), item.attributes)

    # If we now remove them from stored, the edit shouldn't be sent until the
    # unblock event has had a reply.
    q.forbid_events(remove_events)
    call_async(q, stored.Group, 'RemoveMembers', [handle], "")

    # Make sure if the remove is sent prematurely, we catch it.
    sync_stream(q, stream)
    q.unforbid_events(remove_events)

    # So now we send a roster push and reply for the unblock request.
    stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
        'both', False, attrs={}))
    acknowledge_iq(stream, roster_event.stanza)

    # And on receiving the push and reply, Gabble should show them being
    # removed from deny, and send a remove.

    _, roster_event = q.expect_many(
        EventPattern('dbus-signal', signal='MembersChanged',
            args=['', [], [handle], [], [], self_handle, cs.GC_REASON_NONE],
            predicate=is_deny),
        remove_events[0],
        )
    item = roster_event.query.firstChildElement()
    assertEquals(contact, item['jid'])

    stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
        'remove', False, attrs={}))
    acknowledge_iq(stream, roster_event.stanza)

    q.expect('dbus-signal', signal='MembersChanged',
        args=['', [], [handle], [], [], 0, cs.GC_REASON_NONE],
        predicate=is_stored)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:60,代码来源:test-google-roster.py


示例2: test_ensure_ensure

def test_ensure_ensure(q, conn, bus, stream, room_jid):
    # Call Ensure twice for the same channel.
    call_async(q, conn.Requests, 'EnsureChannel',
           { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
             cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
             cs.TARGET_ID: room_jid,
           })
    call_async(q, conn.Requests, 'EnsureChannel',
           { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
             cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
             cs.TARGET_ID: room_jid,
           })

    mc, _ = q.expect_many(
        EventPattern('dbus-signal', signal='MembersChangedDetailed'),
        EventPattern('stream-presence', to=('%s/test' % room_jid)))
    added, removed, local_pending, remote_pending, details = mc.args

    assert added == [], mc.args
    assert removed == [], mc.args
    assert local_pending == [], mc.args
    assert len(remote_pending) == 1, mc.args

    # Send presence for other member of room.
    stream.send(make_muc_presence('owner', 'moderator', room_jid, 'bob'))

    # Send presence for own membership of room.
    stream.send(make_muc_presence('none', 'participant', room_jid, 'test'))

    mc = q.expect('dbus-signal', signal='MembersChangedDetailed')
    added, removed, local_pending, remote_pending, details = mc.args

    assert len(added) == 2, mc.args
    assert removed == [], mc.args
    assert local_pending == [], mc.args
    assert remote_pending == [], mc.args

    members = conn.inspect_contacts_sync(added)
    members.sort()
    assert members == ['%s/bob' % room_jid, '%s/test' % room_jid], members

    # We should get two EnsureChannel returns
    es = []
    while len(es) < 2:
        e = q.expect('dbus-return', method='EnsureChannel')
        es.append(e)

    e1, e2 = es

    assert len(e1.value) == 3
    yours1, path1, props1 = e1.value

    assert len(e2.value) == 3
    yours2, path2, props2 = e2.value

    # Exactly one Ensure should get Yours=True.
    assert (yours1 == (not yours2))

    assert path1 == path2, (path1, path2)
    assert props1 == props2, (props1, props2)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:60,代码来源:test-ensure.py


示例3: test

def test(q, bus, conn, stream):
    conn_props = dbus.Interface(conn, cs.PROPERTIES_IFACE)

    # Try to CreateChannel with unknown properties
    # Gabble must return an error
    call_async(q, conn.Requests, 'CreateChannel',
            {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
             cs.TARGET_ID: "[email protected]",
             'this.property.does.not.exist': 'this.value.should.not.exist'
            })
    ret = q.expect('dbus-error', method='CreateChannel')

    check_no_tubes(conn_props)

    # Try to CreateChannel with missing properties ("Service")
    # Gabble must return an error
    call_async(q, conn.Requests, 'CreateChannel',
            {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
             cs.TARGET_ID: "[email protected]",
            })
    ret = q.expect('dbus-error', method='CreateChannel')

    check_no_tubes(conn_props)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:25,代码来源:create-invalid-tube-channels.py


示例4: request_stream_tube

def request_stream_tube(q, bus, conn, method, jid):
    call_async(q, conn.Requests, method,
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
              cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
              cs.TARGET_ID: jid,
              cs.STREAM_TUBE_SERVICE: 'the.service',
              })
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:7,代码来源:only-text-when-needed.py


示例5: test

def test(q, bus, conn, stream):
    event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
            query_name='vCard')

    acknowledge_iq(stream, event.stanza)
    # Force Gabble to process the vCard before calling any methods.
    sync_stream(q, stream)

    # Request our alias and avatar, expect them to be resolved from cache.

    handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    call_async(q, conn.Avatars, 'RequestAvatars', [handle])
    call_async(q, conn.Aliasing, 'RequestAliases', [handle])

    # FIXME - find out why RequestAliases returns before RequestAvatar even
    # though everything's cached. Probably due to queueing, which means
    # conn-avatars don't look into the cache before making a request. Prolly
    # should make vcard_request look into the cache itself, and return
    # immediately. Or not, if it's g_idle()'d. So it's better if conn-aliasing
    # look into the cache itself.

    r1, r2 = q.expect_many(
        EventPattern('dbus-return', method='RequestAliases'),
        EventPattern('dbus-return', method='RequestAvatars'))

    # Default alias is our jid
    assert r1.value[0] == ['[email protected]']
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:27,代码来源:test-vcard-cache.py


示例6: test_too_slow

 def test_too_slow(self, req1, req2, too_slow):
     """
     Regression test for a bug where if the channel was closed before the
     HTTP responses arrived, the responses finally arriving crashed Gabble.
     """
 
     # User gets bored, and ends the call.
     e = EventPattern('dbus-signal', signal='Closed',
         path=chan.object_path)
 
     if too_slow == TOO_SLOW_CLOSE:
         call_async(self.q, self.chan, 'Close', dbus_interface=cs.CHANNEL)
     elif too_slow == TOO_SLOW_REMOVE_SELF:
         self.chan.Hangup (0, "", "",
             dbus_interface=cs.CHANNEL_TYPE_CALL)
     elif too_slow == TOO_SLOW_DISCONNECT:
         disconnect_conn(q, conn, stream, [e])
 
         try:
             chan.GetMembers()
         except dbus.DBusException, e:
             # This should fail because the object's gone away, not because
             # Gabble's crashed.
             assert cs.DBUS_ERROR_UNKNOWN_METHOD == e.get_dbus_name(), \
                 "maybe Gabble crashed? %s" % e
         else:
             # Gabble will probably also crash in a moment, because the http
             # request callbacks will be called after the channel's meant to
             # have died, which will cause the channel to try to call
             # methods on the (finalized) connection.
             assert False, "the channel should be dead by now"
 
         return
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:33,代码来源:call-google-relay.py


示例7: test

def test(q, bus, conn, stream):
    # Request a sidecar thate we support before we're connected; it should just
    # wait around until we're connected.
    call_async(q, conn.Future, 'EnsureSidecar', PLUGIN_IFACE)

    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    # Now we're connected, the call we made earlier should return.
    path, props = q.expect('dbus-return', method='EnsureSidecar').value
    # This sidecar doesn't even implement get_immutable_properties; it
    # should just get the empty dict filled in for it.
    assertEquals({}, props)

    gateways_iface = dbus.Interface(bus.get_object(conn.bus_name, path),
            PLUGIN_IFACE)

    test_success(q, gateways_iface, stream)
    test_conflict(q, gateways_iface, stream)
    test_not_acceptable(q, gateways_iface, stream)

    call_async(q, conn, 'Disconnect')

    q.expect_many(
        EventPattern('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
        EventPattern('stream-closed'),
        )

    stream.sendFooter()
    q.expect('dbus-return', method='Disconnect')
开发者ID:jku,项目名称:telepathy-gabble,代码行数:32,代码来源:gateways.py


示例8: test

def test(q, bus, conn, stream):
    roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
    roster_event.stanza['type'] = 'result'

    call_async(q, conn, "RequestHandles", cs.HT_GROUP, ['test'])

    event = q.expect('dbus-return', method='RequestHandles')
    test_handle = event.value[0][0]

    call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_CONTACT_LIST,
        cs.HT_GROUP, test_handle, True)

    # A previous incarnation of this test --- written with the intention that
    # RequestChannel would be called before the roster was received, to expose
    # a bug in Gabble triggered by that ordering --- was racy: if the D-Bus
    # daemon happened to be particularly busy, the call to RequestChannel
    # reached Gabble after the roster stanza. (The race was discovered when
    # that reversed order triggered a newly-introduced instance of the
    # opposite bug to the one the test was targetting!) So we sync the XMPP
    # stream and D-Bus queue here.
    sync_stream(q, stream)
    sync_dbus(bus, q, conn)

    # send an empty roster
    stream.send(roster_event.stanza)

    event = q.expect('dbus-return', method='RequestChannel')
    path = event.value[0]

    while True:
        event = q.expect('dbus-signal', signal='NewChannel')
        assert event.args[0] == path, (event.args, path)
        _, type, handle_type, handle, suppress_handler = event.args
        if handle_type == cs.HT_GROUP and handle == test_handle:
            break
开发者ID:jku,项目名称:telepathy-gabble,代码行数:35,代码来源:request-group-before-roster.py


示例9: test

def test(q, bus, conn, stream):
    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    # Need to call this asynchronously as it involves Gabble sending us a
    # query.
    jids = ['[email protected]', '[email protected]']
    call_async(q, conn, 'RequestHandles', 2, jids)

    # Gabble is stupid and discos the alleged conf server twice.
    for i in [0,1]:
        event = q.expect('stream-iq', to='conf.localhost',
            query_ns='http://jabber.org/protocol/disco#info')
        result = make_result_iq(stream, event.stanza)
        feature = result.firstChildElement().addElement('feature')
        feature['var'] = 'http://jabber.org/protocol/muc'
        stream.send(result)

    event = q.expect('dbus-return', method='RequestHandles')
    room_handles = event.value[0]

    test_create_ensure(q, conn, bus, stream, jids[0], room_handles[0])
    test_ensure_ensure(q, conn, bus, stream, jids[1], room_handles[1])
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:25,代码来源:test-ensure.py


示例10: test_error

def test_error(q, bus, conn, stream):
    assertContains(cs.CONN_IFACE_POWER_SAVING,
                  conn.Get(cs.CONN, "Interfaces",
                           dbus_interface=cs.PROPERTIES_IFACE))

    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))

    call_async(q, conn.PowerSaving, 'SetPowerSaving', True)

    stanza = expect_command(q, 'enable')

    error = domish.Element((None, 'error'))

    error.addElement((ns.STANZA, 'service-unavailable'))

    send_error_reply(stream, stanza, error)

    q.expect('dbus-error', method='SetPowerSaving', name=cs.NOT_AVAILABLE)

    # Power saving state should remain false
    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:25,代码来源:power-save.py


示例11: test

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
            args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    self_handle = conn.GetSelfHandle()

    jids = ['[email protected]', '[email protected]']
    call_async(q, conn, 'RequestHandles', 1, jids)

    event = q.expect('dbus-return', method='RequestHandles')
    handles = event.value[0]

    properties = conn.GetAll(
        cs.CONN_IFACE_REQUESTS, dbus_interface=cs.PROPERTIES_IFACE)
    assert properties.get('Channels') == [], properties['Channels']
    assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
             },
             [cs.TARGET_HANDLE, cs.TARGET_ID],
             ) in properties.get('RequestableChannelClasses'),\
                     properties['RequestableChannelClasses']

    test_ensure_ensure(q, conn, self_handle, jids[0], handles[0])
    test_request_ensure(q, conn, self_handle, jids[1], handles[1])
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:25,代码来源:ensure.py


示例12: test

def test(q, bus, conn, stream):
    conn.Connect()

    roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
    roster_event.stanza['type'] = 'result'

    call_async(q, conn, "RequestHandles", cs.HT_GROUP, ['test'])

    event = q.expect('dbus-return', method='RequestHandles')
    test_handle = event.value[0][0]

    # send an empty roster
    stream.send(roster_event.stanza)

    sync_stream(q, stream)
    sync_dbus(bus, q, conn)

    call_async(q, conn.Requests, 'CreateChannel',
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CONTACT_LIST,
              cs.TARGET_HANDLE_TYPE: cs.HT_GROUP,
              cs.TARGET_HANDLE: test_handle,
              })

    event = q.expect('dbus-return', method='CreateChannel')
    ret_path, ret_props = event.value

    event = q.expect('dbus-signal', signal='NewChannels')
    path, props = event.args[0][0]
    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_CONTACT_LIST, props
    assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_GROUP, props
    assert props[cs.TARGET_HANDLE] == test_handle, props
    assert props[cs.TARGET_ID] == 'test', props

    assert ret_path == path, (ret_path, path)
    assert ret_props == props, (ret_props, props)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:35,代码来源:request-group-after-roster.py


示例13: test

def test(q, bus, conn, stream):
    conn.Connect()

    expect_and_handle_get_vcard(q, stream)
    sync_stream(q, stream)

    call_async(q, conn.Avatars, 'SetAvatar', 'william shatner',
        'image/x-actor-name')
    # Gabble request the last version of the vCard before changing it
    expect_and_handle_get_vcard(q, stream)

    set_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
        query_name='vCard', iq_type='set')
    iq = set_vcard_event.stanza

    error = domish.Element((None, 'error'))
    error['code'] = '400'
    error['type'] = 'modify'
    error.addElement((ns.STANZA, 'bad-request'))

    send_error_reply(stream, iq, error)

    event = q.expect('dbus-error', method='SetAvatar')

    assert event.error.get_dbus_name() == cs.INVALID_ARGUMENT, \
        event.error.get_dbus_name()
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:26,代码来源:update-rejected.py


示例14: test

def test(q, bus, conn, stream):
    # This sidecar sends a stanza, and waits for a reply, before being
    # created.
    pattern = EventPattern('stream-iq', to='sidecar.example.com',
        query_ns='http://example.com/sidecar')
    call_async(q, conn.Sidecars1, 'EnsureSidecar', TEST_PLUGIN_IFACE + ".IQ")
    e = q.expect_many(pattern)[0]

    # The server said yes, so we should get a sidecar back!
    acknowledge_iq(stream, e.stanza)
    q.expect('dbus-return', method='EnsureSidecar')

    identities = ["test/app-list//Test"]
    features = ["com.example.test1", "com.example.test2"]
    ver = compute_caps_hash(identities, features, {})

    iq = IQ(stream, "get")
    query = iq.addElement((ns.DISCO_INFO, 'query'))
    query['node'] = ns.GABBLE_CAPS + '#' + ver
    stream.send(iq)
    e = q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info')

    returned_features = [feature['var']
        for feature in xpath.queryForNodes('/iq/query/feature', e.stanza)]
    assertEquals(features, returned_features)

    returned_identities = [identity['category'] + "/" + identity['type']+"//" + identity['name']
        for identity in xpath.queryForNodes('/iq/query/identity', e.stanza)]
    assertEquals(identities, returned_identities)

    new_ver = compute_caps_hash(returned_identities, returned_features, {})
    assertEquals(new_ver, ver)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:32,代码来源:sidecar-own-caps.py


示例15: test

def test(q, bus, conn, stream):
    conn.Connect()

    # Initial vCard request. Respond only after we call SetAliases().
    vcard_get_event = q.expect('stream-iq', iq_type='get', to=None,
        query_ns=ns.VCARD_TEMP, query_name='vCard')
    sync_stream(q, stream)

    handle = conn.GetSelfHandle()
    call_async(q, conn.Aliasing, 'SetAliases', {handle: 'Some Guy'})
    sync_dbus(bus, q, conn)
    acknowledge_iq(stream, vcard_get_event.stanza)

    # Gabble sets a new vCard with our nickname.
    vcard_set_event = q.expect('stream-iq', iq_type='set',
        query_ns=ns.VCARD_TEMP, query_name='vCard')

    # Before the server replies, the user sets their avatar.
    call_async(q, conn.Avatars, 'SetAvatar', 'hello', 'image/png')
    sync_dbus(bus, q, conn)
    acknowledge_iq(stream, vcard_set_event.stanza)

    vcard_set_event = q.expect('stream-iq', iq_type='set',
        query_ns=ns.VCARD_TEMP, query_name='vCard')
    acknowledge_iq(stream, vcard_set_event.stanza)
    q.expect('dbus-return', method='SetAvatar')

    # And then crashes.
    sync_stream(q, stream)

    conn.Disconnect()
    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:32,代码来源:overlapping-sets.py


示例16: run_test

def run_test(q, bus, conn, stream, jt, decloak_allowed):
    """
    Requests streams on a media channel to jt.remote_jid without having their
    presence at all.
    """

    call_async(q, conn.Requests, 'CreateChannel',
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_CALL,
          cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
          cs.TARGET_ID: jt.peer,
          cs.CALL_INITIAL_AUDIO: True,
          cs.CALL_INITIAL_VIDEO: False,
        })

    e = q.expect('stream-presence',
            to=jt.peer_bare_jid, presence_type=None)
    nodes = [ node for node in e.stanza.elements(uri=ns.TEMPPRES, name='temppres') ]
    assertLength(1, nodes)
    assertEquals('media', nodes[0].getAttribute('reason'))

    if decloak_allowed:
        jt.send_presence_and_caps()

        # RequestStreams should now happily complete
        q.expect('dbus-return', method='CreateChannel')
    else:
        q.expect('dbus-error', method='CreateChannel',
                name=cs.OFFLINE)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:28,代码来源:decloak-peer.py


示例17: test

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    chan = join(q, bus, conn)

    stream.sendMessage('PRIVMSG', CHANNEL, ":who's underlined you?", prefix='marnie')
    q.expect('dbus-signal', signal='MessageReceived')

    # Without acking the message, destroy the channel.
    call_async(q, chan.Destroyable, "Destroy")
    q.expect_many(
        EventPattern('stream-PART'),
        EventPattern('dbus-signal', signal='Closed', path=chan.object_path),
        EventPattern('dbus-signal', signal='ChannelClosed', args=[chan.object_path]),
        EventPattern('dbus-return', method='Destroy'),
    )

    # Now Create it again. If we haven't actually left the channel, this will
    # fail.
    chan = join(q, bus, conn)

    # The message should be gone.
    messages = chan.Properties.Get(cs.CHANNEL_IFACE_MESSAGES, 'PendingMessages')
    assertLength(0, messages)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:26,代码来源:muc-destroy.py


示例18: request_random_activity_view

def request_random_activity_view(q, stream, conn, max, id, activities):
    requests_iface = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)

    call_async(q, requests_iface, 'CreateChannel',
        { cs.CHANNEL_TYPE:
            'org.laptop.Telepathy.Channel.Type.ActivityView',
            'org.laptop.Telepathy.Channel.Interface.View.MaxSize': max
          })

    iq_event, return_event = q.expect_many(
    EventPattern('stream-iq', to='gadget.localhost',
        query_ns=ns.OLPC_ACTIVITY),
    EventPattern('dbus-return', method='CreateChannel'))

    view = iq_event.stanza.firstChildElement()
    assert view.name == 'view'
    assert view['id'] == id
    assert view['size'] == str(max)
    random = xpath.queryForNodes('/iq/view/random', iq_event.stanza)

    send_reply_to_activity_view_request(stream, iq_event.stanza, activities)

    props = return_event.value[1]
    assert props['org.laptop.Telepathy.Channel.Type.ActivityView.Properties'] == {}
    assert props['org.laptop.Telepathy.Channel.Type.ActivityView.Participants'] == []

    return return_event.value[0]
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:27,代码来源:util.py


示例19: test

def test(q, bus, conn, stream, image_data, mime_type):
    call_async(q, conn.Avatars, 'SetAvatar', image_data, mime_type)

    expect_and_handle_get_vcard(q, stream)

    def check(vcard):
        assertEquals(mime_type,
            xpath.queryForString('/vCard/PHOTO/TYPE', vcard))

        binval = xpath.queryForString('/vCard/PHOTO/BINVAL', vcard)

        # <http://xmpp.org/extensions/xep-0153.html#bizrules-image> says:
        #
        #  5. The image data MUST conform to the base64Binary datatype and thus
        #     be encoded in accordance with Section 6.8 of RFC 2045, which
        #     recommends that base64 data should have lines limited to at most
        #     76 characters in length.
        lines = binval.split('\n')
        for line in lines:
            assert len(line) <= 76, line

        assertEquals(image_data, base64.decodestring(binval))

    expect_and_handle_set_vcard(q, stream, check=check)

    q.expect('dbus-return', method='SetAvatar')
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:26,代码来源:set-avatar.py


示例20: gadget_publish

def gadget_publish(q, stream, conn, publish):
    gadget_iface = dbus.Interface(conn, 'org.laptop.Telepathy.Gadget')

    call_async(q, gadget_iface, 'Publish', publish)
    if publish:
        q.expect_many(
                EventPattern('stream-presence', presence_type='subscribe'),
                EventPattern('dbus-return', method='Publish'))

        # accept the request
        presence = elem('presence', to='[email protected]', from_='gadget.localhost',
            type='subscribed')
        stream.send(presence)

        # send a subscribe request
        presence = elem('presence', to='[email protected]', from_='gadget.localhost',
            type='subscribe')
        stream.send(presence)

        q.expect('stream-presence', presence_type='subscribed'),
    else:
        q.expect_many(
                EventPattern('stream-presence', presence_type='unsubscribe'),
                EventPattern('stream-presence', presence_type='unsubscribed'),
                EventPattern('dbus-return', method='Publish'))

        # Gadget tries to subscribe but is refused now
        presence = elem('presence', to='[email protected]', from_='gadget.localhost',
            type='subscribe')
        stream.send(presence)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:30,代码来源:util.py



注:本文中的servicetest.call_async函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python servicetest.make_channel_proxy函数代码示例发布时间:2022-05-27
下一篇:
Python servicetest.assertLength函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap