• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api_client.put_data函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.util.base.api_client.put_data函数的典型用法代码示例。如果您正苦于以下问题:Python put_data函数的具体用法?Python put_data怎么用?Python put_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了put_data函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_delete_tag

def test_delete_tag(api_client, default_namespace, thread):
    post_resp = api_client.post_data('/tags/', {'name': 'foo'})
    tag_resp = json.loads(post_resp.data)
    tag_id = tag_resp['id']

    thread_id = api_client.get_data('/threads')[0]['id']
    api_client.put_data('/threads/{}'.format(thread_id), {'add_tags': ['foo']})

    del_resp = api_client.delete('/tags/' + tag_id)
    assert del_resp.status_code == 200
    tag_data = api_client.get_data('/tags/{}'.format(tag_id))
    assert tag_data['message'] == 'No tag found'

    thread = api_client.get_data('/threads/{}'.format(thread_id))
    assert 'foo' not in [tag['name'] for tag in thread['tags']]

    del_resp = api_client.delete('/tags/!' + tag_id)
    assert del_resp.status_code == 400
    assert json.loads(del_resp.data)['message'].startswith('Invalid id')

    del_resp = api_client.delete('/tags/0000000000000000000000000')
    assert del_resp.status_code == 404

    del_resp = api_client.delete('/tags/inbox')
    assert del_resp.status_code == 400
    assert 'user-created' in json.loads(del_resp.data)['message']
开发者ID:raghuveerkancherla,项目名称:sync-engine,代码行数:26,代码来源:test_tags.py


示例2: test_conflicting_updates

def test_conflicting_updates(api_client):
    original_draft = {
        'subject': 'parent draft',
        'body': 'parent draft'
    }
    r = api_client.post_data('/drafts', original_draft)
    original_public_id = json.loads(r.data)['id']
    version = json.loads(r.data)['version']

    updated_draft = {
        'subject': 'updated draft',
        'body': 'updated draft',
        'version': version
    }
    r = api_client.put_data('/drafts/{}'.format(original_public_id),
                            updated_draft)
    assert r.status_code == 200
    updated_public_id = json.loads(r.data)['id']
    updated_version = json.loads(r.data)['version']
    assert updated_version != version

    conflicting_draft = {
        'subject': 'conflicting draft',
        'body': 'conflicting draft',
        'version': version
    }
    r = api_client.put_data('/drafts/{}'.format(original_public_id),
                            conflicting_draft)
    assert r.status_code == 409

    drafts = api_client.get_data('/drafts')
    assert len(drafts) == 1
    assert drafts[0]['id'] == updated_public_id
开发者ID:apolmig,项目名称:inbox,代码行数:33,代码来源:test_drafts.py


示例3: test_unread_cascades_to_messages

def test_unread_cascades_to_messages(log, api_client, default_account, thread,
                                     message):
    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)
    api_client.put_data(thread_path, {'add_tags': ['unread']})
    messages = api_client.get_data('/messages?thread_id={}'.format(thread_id))
    assert all([msg['unread'] for msg in messages])

    api_client.put_data(thread_path, {'remove_tags': ['unread']})
    messages = api_client.get_data('/messages?thread_id={}'.format(thread_id))
    assert not any([msg['unread'] for msg in messages])
开发者ID:raghuveerkancherla,项目名称:sync-engine,代码行数:11,代码来源:test_tags.py


示例4: test_events_are_condensed

def test_events_are_condensed(api_client):
    """Test that multiple revisions of the same object are rolled up in the
    delta response."""
    ts = int(time.time())
    api_client.post_data('/tags/', {'name': 'foo'})
    cursor = get_cursor(api_client, ts)
    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)
    api_client.put_data(thread_path, {'add_tags': ['foo']})
    api_client.put_data(thread_path, {'remove_tags': ['foo']})
    sync_data = api_client.get_data('/delta?cursor={}'.format(cursor))
    assert len(sync_data['deltas']) == 2
开发者ID:Analect,项目名称:sync-engine,代码行数:12,代码来源:test_delta_sync.py


示例5: test_unread_cascades_to_messages

def test_unread_cascades_to_messages(patch_network_functions, log, api_client,
                                     syncback_service, default_account):
    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)
    api_client.put_data(thread_path, {'add_tags': ['unread']})
    gevent.sleep(3)
    messages = api_client.get_data('/messages?thread_id={}'.format(thread_id))
    assert all([msg['unread'] for msg in messages])

    api_client.put_data(thread_path, {'remove_tags': ['unread']})
    gevent.sleep(3)
    messages = api_client.get_data('/messages?thread_id={}'.format(thread_id))
    assert not any([msg['unread'] for msg in messages])
开发者ID:Analect,项目名称:sync-engine,代码行数:13,代码来源:test_tags.py


示例6: test_add_remove_tags

def test_add_remove_tags(api_client):
    assert 'foo' not in [tag['name'] for tag in api_client.get_data('/tags/')]
    assert 'bar' not in [tag['name'] for tag in api_client.get_data('/tags/')]

    api_client.post_data('/tags/', {'name': 'foo'})
    api_client.post_data('/tags/', {'name': 'bar'})

    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)
    api_client.put_data(thread_path, {'add_tags': ['foo']})
    api_client.put_data(thread_path, {'add_tags': ['bar']})

    tag_names = [tag['name'] for tag in
                 api_client.get_data(thread_path)['tags']]
    assert 'foo' in tag_names
    assert 'bar' in tag_names

    # Check that tag was only applied to this thread
    another_thread_id = api_client.get_data('/threads/')[1]['id']
    tag_names = get_tag_names(
        api_client.get_data('/threads/{}'.format(another_thread_id)))
    assert 'foo' not in tag_names

    api_client.put_data(thread_path, {'remove_tags': ['foo']})
    api_client.put_data(thread_path, {'remove_tags': ['bar']})
    tag_names = get_tag_names(api_client.get_data(thread_path))
    assert 'foo' not in tag_names
    assert 'bar' not in tag_names
开发者ID:dlitz,项目名称:inbox,代码行数:28,代码来源:test_tags.py


示例7: test_events_are_condensed

def test_events_are_condensed(api_client):
    """Test that multiple revisions of the same object are rolled up in the
    delta response."""
    ts = int(time.time())
    cursor = get_cursor(api_client, ts)

    # Create, then modify a tag; then modify it again
    tag = json.loads(api_client.post_data('/tags/', {'name': 'foo'}).data)
    tag_id = tag['id']
    api_client.put_data('/tags/{}'.format(tag_id), {'name': 'bar'})
    api_client.put_data('/tags/{}'.format(tag_id), {'name': 'baz'})

    # Modify a thread, then modify it again
    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)
    api_client.put_data(thread_path, {'add_tags': [tag_id]})
    api_client.put_data(thread_path, {'remove_tags': [tag_id]})

    sync_data = api_client.get_data('/delta?cursor={}'.format(cursor))
    assert len(sync_data['deltas']) == 3
    first_delta = sync_data['deltas'][0]
    assert first_delta['object'] == 'tag' and first_delta['event'] == 'create'

    # Check that successive modifies are condensed.

    second_delta = sync_data['deltas'][1]
    assert (second_delta['object'] == 'tag' and
            second_delta['event'] == 'modify')
    assert second_delta['attributes']['name'] == 'baz'

    third_delta = sync_data['deltas'][2]
    assert (third_delta['object'] == 'thread' and
            third_delta['event'] == 'modify')
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:33,代码来源:test_delta_sync.py


示例8: test_move_to_read_only_calendar

def test_move_to_read_only_calendar(db, api_client):
    acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
    ns_id = acct.namespace.public_id
    calendar_list = api_client.get_data('/calendars', ns_id)

    read_only_calendar = None
    writeable_calendar = None
    writeable_event = None
    for c in calendar_list:
        if c['read_only']:
            read_only_calendar = c
        else:
            writeable_calendar = c
            for e_id in c['event_ids']:
                e = api_client.get_data('/events/' + e_id, ns_id)
                if not e['read_only']:
                    writeable_event = e
                    break

    assert read_only_calendar
    assert writeable_event
    assert writeable_calendar
    e_id = writeable_event['id']

    e_data = {'calendar_id': read_only_calendar['id']}
    resp = api_client.put_data('/events/' + e_id, e_data, ns_id)
    assert resp.status_code != 200
开发者ID:apolmig,项目名称:inbox,代码行数:27,代码来源:test_calendars.py


示例9: test_update_calendar

def test_update_calendar(db, api_client):
    acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
    ns_id = acct.namespace.public_id

    c_data = {'name': 'Vacation'}

    resp = api_client.post_data('/calendars', c_data, ns_id)
    resp_data = json.loads(resp.data)

    cal_id = resp_data['id']

    c_update_data = {'name': 'OOO'}
    resp = api_client.put_data('/calendars/' + cal_id, c_update_data, ns_id)
    c_put_data = json.loads(resp.data)
    assert resp.status_code == 200
    assert c_put_data['namespace_id'] == ns_id
    assert c_put_data['name'] == c_update_data['name']
    assert c_put_data['description'] is None
    assert c_put_data['read_only'] is False
    assert c_put_data['object'] == 'calendar'
    assert c_put_data['event_ids'] == []

    resp_data = api_client.get_data('/calendars/' + cal_id, ns_id)
    assert resp_data['namespace_id'] == ns_id
    assert resp_data['name'] == c_update_data['name']
    assert resp_data['description'] is None
    assert resp_data['read_only'] is False
    assert resp_data['object'] == 'calendar'
    assert resp_data['event_ids'] == []

    cal = db.session.query(Calendar).filter_by(public_id=cal_id).one()
    db.session.delete(cal)
    db.session.commit()
开发者ID:apolmig,项目名称:inbox,代码行数:33,代码来源:test_calendars.py


示例10: test_api_update_invalid

def test_api_update_invalid(db, api_client):
    acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
    ns_id = acct.namespace.public_id
    e_update_data = {'title': 'new title'}
    e_id = generate_public_id()
    e_put_resp = api_client.put_data('/events/' + e_id, e_update_data, ns_id)
    assert e_put_resp.status_code != 200
开发者ID:dlitz,项目名称:inbox,代码行数:7,代码来源:test_events.py


示例11: test_update_to_nonexistent_draft

def test_update_to_nonexistent_draft(api_client):
    updated_draft = {"subject": "updated draft", "body": "updated draft", "version": 22}

    r = api_client.put_data("/drafts/{}".format("notarealid"), updated_draft)
    assert r.status_code == 404
    drafts = api_client.get_data("/drafts")
    assert len(drafts) == 0
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:7,代码来源:test_drafts.py


示例12: test_delete_draft

def test_delete_draft(api_client):
    original_draft = {"subject": "parent draft", "body": "parent draft"}
    r = api_client.post_data("/drafts", original_draft)
    draft_public_id = json.loads(r.data)["id"]
    version = json.loads(r.data)["version"]

    updated_draft = {"subject": "updated draft", "body": "updated draft", "version": version}
    r = api_client.put_data("/drafts/{}".format(draft_public_id), updated_draft)
    updated_public_id = json.loads(r.data)["id"]
    updated_version = json.loads(r.data)["version"]

    r = api_client.delete("/drafts/{}".format(updated_public_id), {"version": updated_version})

    # Check that drafts were deleted
    drafts = api_client.get_data("/drafts")
    assert not drafts

    # Check that no orphaned threads are around
    threads = api_client.get_data("/threads?subject=parent%20draft")
    assert not threads
    threads = api_client.get_data("/threads?subject=updated%20draft")
    assert not threads

    # And check that threads aren't deleted if they still have messages.
    thread_public_id = api_client.get_data("/threads")[0]["id"]
    reply_draft = {"subject": "test reply", "body": "test reply", "thread_id": thread_public_id}
    r = api_client.post_data("/drafts", reply_draft)
    public_id = json.loads(r.data)["id"]
    version = json.loads(r.data)["version"]
    thread = api_client.get_data("/threads/{}".format(thread_public_id))
    assert "drafts" in [t["name"] for t in thread["tags"]]
    api_client.delete("/drafts/{}".format(public_id), {"version": version})
    thread = api_client.get_data("/threads/{}".format(thread_public_id))
    assert thread
    assert "drafts" not in [t["name"] for t in thread["tags"]]
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:35,代码来源:test_drafts.py


示例13: test_update_draft

def test_update_draft(api_client):
    original_draft = {
        'subject': 'parent draft',
        'body': 'parent draft'
    }
    r = api_client.post_data('/drafts', original_draft)
    draft_public_id = json.loads(r.data)['id']
    version = json.loads(r.data)['version']

    updated_draft = {
        'subject': 'updated draft',
        'body': 'updated draft',
        'version': version
    }

    r = api_client.put_data('/drafts/{}'.format(draft_public_id),
                            updated_draft)
    updated_public_id = json.loads(r.data)['id']
    updated_version = json.loads(r.data)['version']

    assert updated_public_id == draft_public_id and \
        updated_version != version

    drafts = api_client.get_data('/drafts')
    assert len(drafts) == 1
    assert drafts[0]['id'] == updated_public_id
开发者ID:apolmig,项目名称:inbox,代码行数:26,代码来源:test_drafts.py


示例14: test_delete_draft

def test_delete_draft(api_client):
    original_draft = {
        'subject': 'parent draft',
        'body': 'parent draft'
    }
    r = api_client.post_data('/drafts', original_draft)
    draft_public_id = json.loads(r.data)['id']
    version = json.loads(r.data)['version']

    updated_draft = {
        'subject': 'updated draft',
        'body': 'updated draft',
        'version': version
    }
    r = api_client.put_data('/drafts/{}'.format(draft_public_id),
                            updated_draft)
    updated_public_id = json.loads(r.data)['id']
    updated_version = json.loads(r.data)['version']

    r = api_client.delete('/drafts/{}'.format(updated_public_id),
                          {'version': updated_version})

    # Check that drafts were deleted
    drafts = api_client.get_data('/drafts')
    assert not drafts
开发者ID:dlitz,项目名称:inbox,代码行数:25,代码来源:test_drafts.py


示例15: test_update_draft

def test_update_draft(api_client):
    original_draft = {"subject": "original draft", "body": "parent draft"}
    r = api_client.post_data("/drafts", original_draft)
    draft_public_id = json.loads(r.data)["id"]
    version = json.loads(r.data)["version"]
    assert version == 0

    # Sleep so that timestamp on updated draft is different.
    gevent.sleep(1)

    updated_draft = {"subject": "updated draft", "body": "updated draft", "version": version}

    r = api_client.put_data("/drafts/{}".format(draft_public_id), updated_draft)
    updated_public_id = json.loads(r.data)["id"]
    updated_version = json.loads(r.data)["version"]

    assert updated_public_id == draft_public_id
    assert updated_version > 0

    drafts = api_client.get_data("/drafts")
    assert len(drafts) == 1
    assert drafts[0]["id"] == updated_public_id

    # Check that the thread is updated too.
    thread = api_client.get_data("/threads/{}".format(drafts[0]["thread_id"]))
    assert thread["subject"] == "updated draft"
    assert thread["first_message_timestamp"] == drafts[0]["date"]
    assert thread["last_message_timestamp"] == drafts[0]["date"]
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:28,代码来源:test_drafts.py


示例16: test_api_remove_participant

def test_api_remove_participant(db, api_client):
    acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
    ns_id = acct.namespace.public_id

    e_data = {
        'title': 'Friday Office Party',
        'when': {'time': 1407542195},
        'participants': [{'email': '[email protected]'},
                         {'email': '[email protected]'},
                         {'email': '[email protected]'},
                         {'email': '[email protected]'},
                         {'email': '[email protected]'}]
    }

    e_resp = api_client.post_data('/events', e_data, ns_id)
    e_resp_data = json.loads(e_resp.data)
    assert len(e_resp_data['participants']) == 5
    for i, p in enumerate(e_resp_data['participants']):
        res = [e for e in e_resp_data['participants']
               if e['email'] == p['email']]
        assert len(res) == 1
        assert res[0]['name'] is None

    event_id = e_resp_data['id']
    e_data['participants'].pop()
    e_resp = api_client.put_data('/events/' + event_id, e_data, ns_id)
    e_resp_data = json.loads(e_resp.data)
    assert len(e_resp_data['participants']) == 4
    for i, p in enumerate(e_resp_data['participants']):
        res = [e for e in e_resp_data['participants']
               if e['email'] == p['email']]
        assert len(res) == 1
        assert p['name'] is None
开发者ID:apolmig,项目名称:inbox,代码行数:33,代码来源:test_event_participants.py


示例17: test_contacts_updated

def test_contacts_updated(api_client):
    """Tests that draft-contact associations are properly created and
    updated."""
    draft = {"to": [{"email": "[email protected]"}, {"email": "[email protected]"}]}

    r = api_client.post_data("/drafts", draft)
    assert r.status_code == 200
    draft_id = json.loads(r.data)["id"]
    draft_version = json.loads(r.data)["version"]

    r = api_client.get_data("/[email protected]")
    assert len(r) == 1

    updated_draft = {"to": [{"email": "[email protected]"}, {"email": "[email protected]"}], "version": draft_version}

    r = api_client.put_data("/drafts/{}".format(draft_id), updated_draft)
    assert r.status_code == 200

    r = api_client.get_data("/[email protected]")
    assert len(r) == 1

    r = api_client.get_data("/[email protected]")
    assert len(r) == 0

    r = api_client.get_data("/[email protected]")
    assert len(r) == 1
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:26,代码来源:test_drafts.py


示例18: test_api_update_title

def test_api_update_title(db, api_client):
    acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
    ns_id = acct.namespace.public_id

    e_data = {
        'title': '',
        'when': {'time': 1407542195},
    }

    e_resp = api_client.post_data('/events', e_data, ns_id)
    e_resp_data = json.loads(e_resp.data)
    assert e_resp_data['object'] == 'event'
    assert e_resp_data['namespace_id'] == acct.namespace.public_id
    assert e_resp_data['title'] == e_data['title']
    assert e_resp_data['when']['time'] == e_data['when']['time']
    assert 'id' in e_resp_data
    e_id = e_resp_data['id']

    e_update_data = {'title': 'new title'}
    e_put_resp = api_client.put_data('/events/' + e_id, e_update_data, ns_id)
    e_put_data = json.loads(e_put_resp.data)

    assert e_put_data['object'] == 'event'
    assert e_put_data['namespace_id'] == acct.namespace.public_id
    assert e_put_data['id'] == e_id
    assert e_put_data['title'] == 'new title'
    assert e_put_data['when']['object'] == 'time'
    assert e_put_data['when']['time'] == e_data['when']['time']
开发者ID:dlitz,项目名称:inbox,代码行数:28,代码来源:test_events.py


示例19: test_api_update_title

def test_api_update_title(db, api_client, calendar, default_account):
    e_data = {
        'title': '',
        'calendar_id': calendar.public_id,
        'when': {'time': 1407542195},
    }

    e_resp = api_client.post_data('/events', e_data)
    e_resp_data = json.loads(e_resp.data)
    assert e_resp_data['object'] == 'event'
    assert e_resp_data['namespace_id'] == default_account.namespace.public_id
    assert e_resp_data['title'] == e_data['title']
    assert e_resp_data['when']['time'] == e_data['when']['time']
    assert 'id' in e_resp_data
    e_id = e_resp_data['id']

    e_update_data = {'title': 'new title'}
    e_put_resp = api_client.put_data('/events/' + e_id, e_update_data)
    e_put_data = json.loads(e_put_resp.data)

    assert e_put_data['object'] == 'event'
    assert e_put_data['namespace_id'] == default_account.namespace.public_id
    assert e_put_data['id'] == e_id
    assert e_put_data['title'] == 'new title'
    assert e_put_data['when']['object'] == 'time'
    assert e_put_data['when']['time'] == e_data['when']['time']
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:26,代码来源:test_events.py


示例20: test_actions_dont_duplicate

def test_actions_dont_duplicate(patch_network_functions, api_client, db,
                                syncback_service, default_account):
    """ Test that a client invocation that would cause the same action to
        be scheduled multiple times only inserts one row into ActionLog.
    """
    from inbox.models import ActionLog

    thread_id = api_client.get_data('/threads/')[0]['id']
    thread_path = '/threads/{}'.format(thread_id)

    # Make sure tags are removed to start with
    api_client.put_data(thread_path, {'add_tags': ['archive'],
                                      'remove_tags': ['inbox']})

    action_log_count = db.session.query(ActionLog).count()
    assert action_log_count == 1
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:16,代码来源:test_tags.py



注:本文中的tests.util.base.api_client.put_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python filesystem_utils.get_fs_path函数代码示例发布时间:2022-05-27
下一篇:
Python api_client.post_data函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap