• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python servicetest.assertContains函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中servicetest.assertContains函数的典型用法代码示例。如果您正苦于以下问题:Python assertContains函数的具体用法?Python assertContains怎么用?Python assertContains使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了assertContains函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_create_invisible_list

def test_create_invisible_list(q, bus, conn, stream):
    conn.SimplePresence.SetPresence("away", "")

    conn.Connect()

    stream.handle_get_all_privacy_lists(q, bus, conn)

    get_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='get')
    list_node = xpath.queryForNodes('//list', get_list.query)[0]
    assertEquals('invisible', list_node['name'])

    error = domish.Element((None, 'error'))
    error['type'] = 'cancel'
    error.addElement((ns.STANZA, 'item-not-found'))
    send_error_reply (stream, get_list.stanza, error)

    create_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='set')
    list_node = xpath.queryForNodes('//list', create_list.query)[0]
    assertEquals('invisible', list_node['name'])
    assertNotEquals([],
        xpath.queryForNodes('/query/list/item/presence-out', create_list.query))
    acknowledge_iq(stream, create_list.stanza)

    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])

    assertContains("hidden",
        conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE, "Statuses"))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:28,代码来源:invisible_xep_0126.py


示例2: test_with_xep0186

def test_with_xep0186(q, bus, conn, stream):
    statuses = conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE,
        'Statuses')

    # testbusy and testaway are provided by test plugin
    assertContains('testbusy', statuses)
    assertContains('testaway', statuses)

    assertEquals(statuses['testbusy'][0], cs.PRESENCE_BUSY)
    assertEquals(statuses['testaway'][0], cs.PRESENCE_AWAY)

    conn.SimplePresence.SetPresence('testbusy', '')

    conn.Connect()

    # ... gabble asks for all the available lists on the server ...
    stream.handle_get_all_privacy_lists(q, bus, conn,
        lists=["foo-list", "test-busy-list", "bar-list"])

    get_list = q.expect('stream-iq', query_ns=ns.PRIVACY, iq_type='set')
    list_node = xpath.queryForNodes('//active', get_list.query)[0]

    # ... and then activates the one linked with the requested status
    # Note: testbusy status is linked to test-busy-list by test plugin
    assertEquals('test-busy-list', list_node['name'])
    acknowledge_iq(stream, get_list.stanza)

    q.expect('dbus-signal', signal='PresencesChanged',
        args=[{1L: (cs.PRESENCE_BUSY, u'testbusy', '')}])
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:29,代码来源:plugins.py


示例3: build_request

def build_request(conn, channel_name, use_room):
    rccs = conn.Properties.Get(cs.CONN_IFACE_REQUESTS,
        'RequestableChannelClasses')

    if use_room:
        # We allow TargetHandleType in Room-flavoured requests, but it has to
        # be None if specified.
        assertContains(
            ({ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT },
             [cs.TARGET_HANDLE_TYPE, cs.ROOM_NAME],
            ), rccs)

        request = {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
            cs.ROOM_NAME: '#idletest'
        }
    else:
        assertContains(
            ({ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
               cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
             },
             [cs.TARGET_HANDLE, cs.TARGET_ID]
            ), rccs)
        request = {
            cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
            cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
            cs.TARGET_ID: '#idletest',
        }

    return dbus.Dictionary(request, signature='sv')
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-idle,代码行数:30,代码来源:requests-muc.py


示例4: stop_sending

    def stop_sending(self, content):

        self.sending = False

        content.stream.SetSending(False)
        self.q.expect('dbus-signal', signal='SendingStateChanged',
                      args=[cs.CALL_STREAM_FLOW_STATE_PENDING_STOP],
                      path=content.stream.__dbus_object_path__)

        content.stream.Media.CompleteSendingStateChange(
            cs.CALL_STREAM_FLOW_STATE_STOPPED)

        o = self.q.expect_many(
            EventPattern('dbus-signal', signal='SendingStateChanged',
                         args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
                         path=content.stream.__dbus_object_path__),
            EventPattern('dbus-signal', signal='LocalSendingStateChanged',
                         path=content.stream.__dbus_object_path__),
            EventPattern('sip-invite'))

        assertEquals(cs.CALL_SENDING_STATE_NONE, o[1].args[0])
        assertEquals(self.self_handle, o[1].args[1][0])
        reinvite_event = o[2]

        assertContains('a=recvonly', reinvite_event.sip_message.body)
        self.context.check_call_sdp(reinvite_event.sip_message.body)
        body = reinvite_event.sip_message.body.replace(
            'recvonly', self.receiving and 'sendonly' or 'inactive')
        
        self.context.accept(reinvite_event.sip_message, body)

        ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
        self.q.expect('sip-ack', cseq=ack_cseq)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:33,代码来源:direction-change.py


示例5: test

def test(q, bus, conn, stream):
    # The user happily joins a MUC
    _, chan, _, _ = join_muc(q, bus, conn, stream, MUC)
    muc_self_handle = chan.Group.GetSelfHandle()
    muc_self_jid, = conn.InspectHandles(cs.HT_CONTACT, [muc_self_handle])

    # But then Bob kicks us.
    bob_jid = "%s/bob" % MUC
    bob_handle, = conn.RequestHandles(cs.HT_CONTACT, [bob_jid])
    stream.send(
        elem("presence", from_=muc_self_jid, type="unavailable")(
            elem(ns.MUC_USER, "x")(
                elem("item", affiliation="none", role="none")(elem("actor", jid=bob_jid), elem("reason")(u"bye")),
                elem("status", code="307"),
            )
        )
    )

    mcd_event = q.expect("dbus-signal", signal="MembersChangedDetailed")
    added, removed, local_pending, remote_pending, details = mcd_event.args
    assertEquals([], added)
    assertEquals([muc_self_handle], removed)
    assertEquals([], local_pending)
    assertEquals([], remote_pending)
    assertContains("actor", details)
    assertEquals(bob_handle, details["actor"])
    assertEquals(cs.GC_REASON_KICKED, details["change-reason"])
    assertEquals("bye", details["message"])

    q.expect("dbus-signal", signal="ChannelClosed")
开发者ID:jku,项目名称:telepathy-gabble,代码行数:30,代码来源:kicked.py


示例6: test_deny_unblock_remove

def test_deny_unblock_remove(q, bus, conn, stream, stored, deny):
    """
    Test unblocking a contact, and, while that request is pending, deleting
    them.
    """
    self_handle = conn.GetSelfHandle()

    # This contact was on our roster, blocked and subscribed, when we started.
    contact = '[email protected]'
    handle = conn.RequestHandles(cs.HT_CONTACT, [contact])[0]

    # They're blocked, and we have a bidi subscription, so they should be on
    # deny and stored. (We already checked this earlier, but we've been messing
    # with the roster so let's be sure the preconditions are okay...)
    assertContains(handle,
        deny.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))
    assertContains(handle,
        stored.Properties.Get(cs.CHANNEL_IFACE_GROUP, "Members"))

    # Unblock them.
    call_async(q, deny.Group, 'RemoveMembers', [handle], "")

    roster_event = q.expect('stream-iq', query_ns=ns.ROSTER)
    item = roster_event.query.firstChildElement()
    assertEquals(contact, item['jid'])
    assertDoesNotContain((ns.GOOGLE_ROSTER, 't'), item.attributes)

    # If we now remove them from stored, the edit shouldn't be sent until the
    # unblock event has had a reply.
    q.forbid_events(remove_events)
    call_async(q, stored.Group, 'RemoveMembers', [handle], "")

    # Make sure if the remove is sent prematurely, we catch it.
    sync_stream(q, stream)
    q.unforbid_events(remove_events)

    # So now we send a roster push and reply for the unblock request.
    stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
        'both', False, attrs={}))
    acknowledge_iq(stream, roster_event.stanza)

    # And on receiving the push and reply, Gabble should show them being
    # removed from deny, and send a remove.

    _, roster_event = q.expect_many(
        EventPattern('dbus-signal', signal='MembersChanged',
            args=['', [], [handle], [], [], self_handle, cs.GC_REASON_NONE],
            predicate=is_deny),
        remove_events[0],
        )
    item = roster_event.query.firstChildElement()
    assertEquals(contact, item['jid'])

    stream.send(make_set_roster_iq(stream, '[email protected]/Resource', contact,
        'remove', False, attrs={}))
    acknowledge_iq(stream, roster_event.stanza)

    q.expect('dbus-signal', signal='MembersChanged',
        args=['', [], [handle], [], [], 0, cs.GC_REASON_NONE],
        predicate=is_stored)
开发者ID:jku,项目名称:telepathy-gabble,代码行数:60,代码来源:test-google-roster.py


示例7: receiving_failed

    def receiving_failed(self, content):

        self.receiving = False

        assert self.contents[0].stream.Properties.Get(cs.CALL_STREAM_IFACE_MEDIA,
                                                  'SendingState') == cs.CALL_STREAM_FLOW_STATE_STARTED

        content.stream.Media.ReportReceivingFailure(
            cs.CALL_STATE_CHANGE_REASON_MEDIA_ERROR, "", "receiving error")

        o = self.q.expect_many(
            EventPattern('dbus-signal', signal='ReceivingStateChanged',
                         args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
                         path=content.stream.__dbus_object_path__),
            EventPattern('dbus-signal', signal='RemoteMembersChanged',
                         path=content.stream.__dbus_object_path__,
                         predicate=lambda e: e.args[0] == {self.remote_handle: cs.CALL_SENDING_STATE_PENDING_STOP_SENDING}),
            EventPattern('sip-invite'))

        reinvite_event = o[2]

        assertContains('a=sendonly', reinvite_event.sip_message.body)
        self.context.check_call_sdp(reinvite_event.sip_message.body)
        body = reinvite_event.sip_message.body.replace(
            'sendonly', self.sending and 'recvonly' or 'inactive')

        self.context.accept(reinvite_event.sip_message, body)

        ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
        self.q.expect('sip-ack', cseq=ack_cseq)

        self.start_receiving(content)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:32,代码来源:direction-change.py


示例8: advertise_caps

def advertise_caps(q, conn, stream, filters, expected_features, unexpected_features,
                   expected_caps):
    self_handle = conn.GetSelfHandle()
    ret_caps = conn.ContactCapabilities.UpdateCapabilities(
            [(cs.CLIENT + '.Foo', filters, [])])

    # Expect Gabble to reply with the correct caps
    event, namespaces, _, signaled_caps = receive_presence_and_ask_caps(q, stream)

    assertSameElements(expected_caps, signaled_caps)

    assertContains(ns.TUBES, namespaces)

    for var in expected_features:
        assertContains(var, namespaces)

    for var in unexpected_features:
        assertDoesNotContain(var, namespaces)

    # Check our own caps
    caps = conn.ContactCapabilities.GetContactCapabilities([self_handle])
    assertSameElements(expected_caps, caps)

    # check the Contacts interface give the same caps
    caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
            [self_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
            [self_handle][cs.ATTR_CONTACT_CAPABILITIES]
    assertSameElements(caps[self_handle], caps_via_contacts_iface)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:28,代码来源:tube-caps.py


示例9: test

def test(q, bus, conn):
    cm = bus.get_object(cs.CM + '.ring',
        tp_path_prefix + '/ConnectionManager/ring')
    cm_iface = dbus.Interface(cm, cs.CM)
    cm_prop_iface = dbus.Interface(cm, cs.PROPERTIES_IFACE)

    protocols = unwrap(cm_prop_iface.Get(cs.CM, 'Protocols'))
    assertEquals(set(['tel']), set(protocols.keys()))

    protocol_names = unwrap(cm_iface.ListProtocols())
    assertEquals(set(['tel']), set(protocol_names))

    cm_params = cm_iface.GetParameters('tel')
    local_props = protocols['tel']
    local_params = local_props[cs.PROTOCOL + '.Parameters']
    assertEquals(cm_params, local_params)

    proto = bus.get_object(cm.bus_name, cm.object_path + '/tel')
    proto_prop_iface = dbus.Interface(proto, cs.PROPERTIES_IFACE)
    proto_props = unwrap(proto_prop_iface.GetAll(cs.PROTOCOL))

    for key in ['Parameters', 'Interfaces', 'ConnectionInterfaces',
      'RequestableChannelClasses', u'VCardField', u'EnglishName', u'Icon']:
        a = local_props[cs.PROTOCOL + '.' + key]
        b = proto_props[key]
        assertEquals(a, b)

    assertEquals('tel', proto_props['VCardField'])
    assertEquals('Mobile Telephony', proto_props['EnglishName'])
    assertEquals('im-tel', proto_props['Icon'])

    assertContains(cs.CONN_IFACE_REQUESTS, proto_props['ConnectionInterfaces'])
开发者ID:jusa,项目名称:telepathy-ring,代码行数:32,代码来源:protocol.py


示例10: sending_failed

    def sending_failed(self, content):

        self.sending = False

        content.stream.Media.ReportSendingFailure(
            cs.CALL_STATE_CHANGE_REASON_MEDIA_ERROR, "", "sending error")

        o = self.q.expect_many(
            EventPattern('dbus-signal', signal='SendingStateChanged',
                         args=[cs.CALL_STREAM_FLOW_STATE_STOPPED],
                         path=content.stream.__dbus_object_path__),
            EventPattern('dbus-signal', signal='LocalSendingStateChanged',
                         path=content.stream.__dbus_object_path__,
                         predicate=lambda e: e.args[0] == 0),
            EventPattern('sip-invite'))

        assertEquals(cs.CALL_SENDING_STATE_NONE, o[1].args[0])
        assertEquals(self.self_handle, o[1].args[1][0])
        reinvite_event = o[2]

        assertContains('a=recvonly', reinvite_event.sip_message.body)
        self.context.check_call_sdp(reinvite_event.sip_message.body)
        body = reinvite_event.sip_message.body.replace(
            'recvonly', self.receiving and 'sendonly' or 'inactive')

        self.context.accept(reinvite_event.sip_message, body)

        ack_cseq = "%s ACK" % reinvite_event.cseq.split()[0]
        self.q.expect('sip-ack', cseq=ack_cseq)

        self.start_sending(content)
开发者ID:freedesktop-unofficial-mirror,项目名称:telepathy__telepathy-rakia,代码行数:31,代码来源:direction-change.py


示例11: test_content_addition

    def test_content_addition(self):
        path = self.chan.AddContent("Webcam", cs.CALL_MEDIA_TYPE_VIDEO,
                                    cs.MEDIA_STREAM_DIRECTION_BIDIRECTIONAL,
                                    dbus_interface=cs.CHANNEL_TYPE_CALL)
        content = self.bus.get_object(self.conn.bus_name, path)
        content_properties = content.GetAll(cs.CALL_CONTENT,
                dbus_interface=dbus.PROPERTIES_IFACE)

        assertEquals(cs.CALL_DISPOSITION_NONE,
                content_properties["Disposition"])
        #assertEquals(self_handle, content_properties["Creator"])
        assertContains("Webcam", content_properties["Name"])

        md = self.jt2.get_call_video_md_dbus()
        self.check_and_accept_offer(content, md)

        cstream = self.bus.get_object(self.conn.bus_name,
                content_properties["Streams"][0])
        candidates = self.jt2.get_call_remote_transports_dbus()
        cstream.AddCandidates(candidates,
                dbus_interface=cs.CALL_STREAM_IFACE_MEDIA)

        self.q.expect('stream-iq',
                predicate=self.jp.action_predicate('content-add'))

        content.Remove(dbus_interface=cs.CALL_CONTENT)
        self.q.expect('stream-iq',
                predicate=self.jp.action_predicate('content-remove'))
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:28,代码来源:call-basics.py


示例12: test

def test(q, bus, conn, stream):
    """
    Check that when we receive a roster update we emit a single AliasesChanged
    for all the contacts.
    """

    conn.Connect()

    # Gabble asks the server for the initial roster
    event = q.expect('stream-iq', iq_type='get', query_ns=ns.ROSTER)
    result = make_result_iq(stream, event.stanza)

    # We reply with a roster with two contacts
    bob_jid = '[email protected]'
    bob_alias = 'Bob Smith'
    add_contact(result, bob_jid, bob_alias)

    alice_jid = '[email protected]'
    alice_alias = 'Alice'
    add_contact(result, alice_jid, alice_alias)

    stream.send(result)

    # Check we get a single AliasesChanged for both contacts
    event = q.expect('dbus-signal', signal='AliasesChanged')
    added = event.args[0]

    bob_handle, alice_handle = conn.get_contact_handles_sync(
            [bob_jid, alice_jid])

    assertLength(2, added)
    assertContains((bob_handle, bob_alias), added)
    assertContains((alice_handle, alice_alias), added)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:33,代码来源:initial-aliases.py


示例13: test_invisible

def test_invisible(q, bus, conn, stream):
    assertContains("hidden",
        conn.Properties.Get(cs.CONN_IFACE_SIMPLE_PRESENCE, "Statuses"))

    conn.SimplePresence.SetPresence("hidden", "")

    # First we send an <invisible/> command.
    event = q.expect('stream-iq', query_name='invisible')
    acknowledge_iq(stream, event.stanza)

    # When that's returned successfully, we can signal the change on D-Bus.
    q.expect('dbus-signal', signal='PresencesChanged',
                     interface=cs.CONN_IFACE_SIMPLE_PRESENCE,
                     args=[{1: (5, 'hidden', '')}])

    conn.SimplePresence.SetPresence("away", "gone")

    # First Gabble sends a <visible/> command.
    event = q.expect('stream-iq', query_name='visible')
    acknowledge_iq(stream, event.stanza)

    # Then: "It is the responsibility of the client to send an undirected
    # presence notification to the server". Plus, we should signal the change
    # on D-Bus.
    q.expect_many(
        EventPattern('stream-presence', to=None),
        EventPattern('dbus-signal', signal='PresencesChanged',
                     interface=cs.CONN_IFACE_SIMPLE_PRESENCE,
                     args=[{1: (3, 'away', 'gone')}]))
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:29,代码来源:invisible_xep_0186.py


示例14: test

def test(q, bus, conn, stream):
    # The user happily joins a MUC
    chan, _, _ = join_muc(q, bus, conn, stream, MUC)
    muc_self_handle = chan.Properties.Get(cs.CHANNEL_IFACE_GROUP,
            "SelfHandle")
    muc_self_jid = conn.inspect_contact_sync(muc_self_handle)

    # But then Bob kicks us.
    bob_jid = '%s/bob' % MUC
    bob_handle = conn.get_contact_handle_sync(bob_jid)
    stream.send(
        elem('presence', from_=muc_self_jid, type='unavailable')(
          elem(ns.MUC_USER, 'x')(
            elem('item', affiliation='none', role='none')(
              elem('actor', jid=bob_jid),
              elem('reason')(
                u'bye'
              )
            ),
            elem('status', code='307'),
          )
        ))

    mcd_event = q.expect('dbus-signal', signal='MembersChangedDetailed')
    added, removed, local_pending, remote_pending, details = mcd_event.args
    assertEquals([], added)
    assertEquals([muc_self_handle], removed)
    assertEquals([], local_pending)
    assertEquals([], remote_pending)
    assertContains('actor', details)
    assertEquals(bob_handle, details['actor'])
    assertEquals(cs.GC_REASON_KICKED, details['change-reason'])
    assertEquals('bye', details['message'])

    q.expect('dbus-signal', signal='ChannelClosed')
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:35,代码来源:kicked.py


示例15: test_error

def test_error(q, bus, conn, stream):
    assertContains(cs.CONN_IFACE_POWER_SAVING,
                  conn.Get(cs.CONN, "Interfaces",
                           dbus_interface=cs.PROPERTIES_IFACE))

    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))

    call_async(q, conn.PowerSaving, 'SetPowerSaving', True)

    stanza = expect_command(q, 'enable')

    error = domish.Element((None, 'error'))

    error.addElement((ns.STANZA, 'service-unavailable'))

    send_error_reply(stream, stanza, error)

    q.expect('dbus-error', method='SetPowerSaving', name=cs.NOT_AVAILABLE)

    # Power saving state should remain false
    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:25,代码来源:power-save.py


示例16: advertise_caps

def advertise_caps(q, bus, conn, stream, filters, expected_features, unexpected_features,
                   expected_caps):
    # make sure nothing from a previous update is still running
    sync_dbus(bus, q, conn)

    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    ret_caps = conn.ContactCapabilities.UpdateCapabilities(
            [(cs.CLIENT + '.Foo', filters, [])])

    # Expect Gabble to reply with the correct caps
    event, namespaces, _, signaled_caps = receive_presence_and_ask_caps(q, stream)

    assertSameElements(expected_caps, signaled_caps[self_handle])

    assertContains(ns.TP_FT_METADATA, namespaces)

    for var in expected_features:
        assertContains(var, namespaces)

    for var in unexpected_features:
        assertDoesNotContain(var, namespaces)

    # Check our own caps
    caps = get_contacts_capabilities_sync(conn, [self_handle])
    assertSameElements(expected_caps, caps[self_handle])

    # check the Contacts interface give the same caps
    caps_via_contacts_iface = conn.Contacts.GetContactAttributes(
            [self_handle], [cs.CONN_IFACE_CONTACT_CAPS], False) \
            [self_handle][cs.ATTR_CONTACT_CAPABILITIES]
    assertSameElements(caps[self_handle], caps_via_contacts_iface)
开发者ID:Thaodan,项目名称:telepathy-gabble,代码行数:31,代码来源:ft-client-caps.py


示例17: check_NewChannels_signal

def check_NewChannels_signal(conn, args, channel_type, chan_path, contact_handle,
                             contact_id, initiator_handle):
    """
    Checks the first argument, a one-tuple of arguments from NewChannels,
    matches the other arguments.
    """
    assert len(args) == 1, args
    assert len(args[0]) == 1        # one channel
    path, props = args[0][0]

    assert path == chan_path, (emitted_path, chan_path)

    assert props[cs.CHANNEL_TYPE] == channel_type, (props, channel_type)
    assert props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT, props
    assert props[cs.TARGET_HANDLE] == contact_handle, (props, contact_handle)
    assert props[cs.TARGET_ID] == contact_id, (props, contact_id)
    assert props[cs.REQUESTED] == True, props
    assert props[cs.INITIATOR_HANDLE] == initiator_handle, \
        (props, initiator_handle)
    assert props[cs.INITIATOR_ID] == '[email protected]', props

    # check that the newly announced channel is in the channels list
    all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
        dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
    assertContains((path, props), all_channels)
开发者ID:dfghj44444,项目名称:realxtend-naali-deps,代码行数:25,代码来源:tubetestutil.py


示例18: test

def test(q, bus, conn, stream):
    self_handle = conn.GetSelfHandle()

    jid = '[email protected]'
    foo_handle = conn.RequestHandles(cs.HT_CONTACT, [jid])[0]

    properties = conn.GetAll(
        cs.CONN_IFACE_REQUESTS, dbus_interface=dbus.PROPERTIES_IFACE)
    assert properties.get('Channels') == [], properties['Channels']
    assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
             },
             [cs.TARGET_HANDLE, cs.TARGET_ID],
             ) in properties.get('RequestableChannelClasses'),\
                     properties['RequestableChannelClasses']

    call_async(q, conn.Requests, 'CreateChannel',
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
              cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
              cs.TARGET_HANDLE: foo_handle,
              })

    ret, old_sig, new_sig = q.expect_many(
        EventPattern('dbus-return', method='CreateChannel'),
        EventPattern('dbus-signal', signal='NewChannel'),
        EventPattern('dbus-signal', signal='NewChannels'),
        )

    assert len(ret.value) == 2
    emitted_props = ret.value[1]
    assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT
    assert emitted_props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
    assert emitted_props[cs.TARGET_HANDLE] == foo_handle
    assert emitted_props[cs.TARGET_ID] == jid
    assert emitted_props[cs.REQUESTED] == True
    assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
    assert emitted_props[cs.INITIATOR_ID] == '[email protected]'
    assertContains('text/plain', emitted_props[cs.SUPPORTED_CONTENT_TYPES])
    assertEquals(0, emitted_props[cs.MESSAGE_PART_SUPPORT_FLAGS])
    assertEquals(cs.DELIVERY_REPORTING_SUPPORT_FLAGS_RECEIVE_FAILURES,
            emitted_props[cs.DELIVERY_REPORTING_SUPPORT])

    assert old_sig.args[0] == ret.value[0]
    assert old_sig.args[1] == cs.CHANNEL_TYPE_TEXT
    assert old_sig.args[2] == cs.HT_CONTACT
    assert old_sig.args[3] == foo_handle
    assert old_sig.args[4] == True      # suppress handler

    assert len(new_sig.args) == 1
    assert len(new_sig.args[0]) == 1        # one channel
    assert len(new_sig.args[0][0]) == 2     # two struct members
    assert new_sig.args[0][0][0] == ret.value[0]
    assert new_sig.args[0][0][1] == ret.value[1]

    properties = conn.GetAll(
        cs.CONN_IFACE_REQUESTS, dbus_interface=dbus.PROPERTIES_IFACE)

    assert new_sig.args[0][0] in properties['Channels'], \
            (new_sig.args[0][0], properties['Channels'])
开发者ID:jku,项目名称:telepathy-gabble,代码行数:59,代码来源:initiate-requestotron.py


示例19: test_contact_blocking

def test_contact_blocking(q, bus, conn, stream, stored, deny):
    """test ContactBlocking API"""
    assertContains(cs.CONN_IFACE_CONTACT_BLOCKING, conn.Properties.Get(cs.CONN, "Interfaces"))

    # 3 contacts are blocked
    blocked = conn.RequestBlockedContacts(dbus_interface=cs.CONN_IFACE_CONTACT_BLOCKING)

    assertLength(3, blocked)
开发者ID:mlundblad,项目名称:telepathy-gabble,代码行数:8,代码来源:test-google-roster.py


示例20: test

def test(q, bus, conn, stream):
    assertContains(cs.CONN_IFACE_POWER_SAVING,
                  conn.Get(cs.CONN, "Interfaces",
                           dbus_interface=cs.PROPERTIES_IFACE))

    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))

    call_async(q, conn.PowerSaving, 'SetPowerSaving', True)

    stanza = expect_command(q, 'enable')

    stream.send(make_result_iq(stream, stanza, False))

    q.expect_many(EventPattern('dbus-return', method='SetPowerSaving'),
                  EventPattern('dbus-signal', signal='PowerSavingChanged',
                               args=[True]))

    assertEquals (True, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))

    pattern = [EventPattern('stream-iq', query_name='query',
                            query_ns=ns.GOOGLE_QUEUE),
               EventPattern('dbus-signal', signal='PowerSavingChanged',
                            args=[True])]

    q.forbid_events(pattern)

    # Enabling power saving again should be a no-op.
    call_async(q, conn.PowerSaving, 'SetPowerSaving', True)

    q.expect('dbus-return', method='SetPowerSaving')

    q.unforbid_events(pattern)

    call_async(q, conn.PowerSaving, 'SetPowerSaving', False)

    stanza = expect_command(q, 'disable')

    stream.send(make_result_iq(stream, stanza, False))

    event, _, _ = q.expect_many(
        EventPattern('stream-iq', query_name='query', query_ns=ns.GOOGLE_QUEUE),
        EventPattern('dbus-return', method='SetPowerSaving'),
        EventPattern('dbus-signal', signal='PowerSavingChanged',
                     args=[False]))

    command = event.query.firstChildElement()
    assertEquals("flush", command.name)

    assertEquals (False, conn.Get(cs.CONN_IFACE_POWER_SAVING,
                                  "PowerSavingActive",
                                  dbus_interface=cs.PROPERTIES_IFACE))
开发者ID:jku,项目名称:telepathy-gabble,代码行数:55,代码来源:power-save.py



注:本文中的servicetest.assertContains函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python servicetest.assertEquals函数代码示例发布时间:2022-05-27
下一篇:
Python verify.Verify类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap