• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python api.insert_run_start函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中metadatastore.api.insert_run_start函数的典型用法代码示例。如果您正苦于以下问题:Python insert_run_start函数的具体用法?Python insert_run_start怎么用?Python insert_run_start使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了insert_run_start函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_basic_usage

def test_basic_usage():
    for i in range(5):
        insert_run_start(time=float(i), scan_id=i + 1,
                         owner='nedbrainard', beamline_id='example',
                         uid=str(uuid.uuid4()))
    header_1 = db[-1]

    header_ned = db(owner='nedbrainard')
    header_ned = db.find_headers(owner='nedbrainard')  # deprecated API
    header_null = db(owner='this owner does not exist')
    # smoke test
    db.fetch_events(header_1)
    db.fetch_events(header_ned)
    db.fetch_events(header_null)
    list(get_events(header_1))
    list(get_events(header_null))
    get_table(header_1)
    get_table(header_ned)
    get_table(header_null)

    # get events for multiple headers
    list(get_events(db[-2:]))

    # test time shift issue GH9
    table = get_table(db[105])
    assert table.notnull().all().all()
开发者ID:tacaswell,项目名称:databroker,代码行数:26,代码来源:test_broker.py


示例2: test_data_key

def test_data_key():
    rs1_uid = insert_run_start(time=100.0, scan_id=1, owner="nedbrainard", beamline_id="example", uid=str(uuid.uuid4()))
    rs2_uid = insert_run_start(time=200.0, scan_id=2, owner="nedbrainard", beamline_id="example", uid=str(uuid.uuid4()))
    rs1, = find_run_starts(uid=rs1_uid)
    rs2, = find_run_starts(uid=rs2_uid)
    data_keys = {"fork": {"source": "_", "dtype": "number"}, "spoon": {"source": "_", "dtype": "number"}}
    insert_descriptor(run_start=rs1_uid, data_keys=data_keys, time=100.0, uid=str(uuid.uuid4()))
    insert_descriptor(run_start=rs2_uid, data_keys=data_keys, time=200.0, uid=str(uuid.uuid4()))
    result1 = db(data_key="fork")
    result2 = db(data_key="fork", start_time=150)
    assert len(result1) == 2
    assert len(result2) == 1
    actual = result2[0]["start"]["uid"]
    assert actual == str(rs2.uid)
开发者ID:ericdill,项目名称:databroker,代码行数:14,代码来源:test_broker.py


示例3: test_scan_id_lookup

def test_scan_id_lookup():
    for i in range(5):
        insert_run_start(time=float(i), scan_id=i + 1,
                         owner='docbrown', beamline_id='example',
                         beamline_config=insert_beamline_config({}, time=0.))
    for i in range(5):
        insert_run_start(time=float(i), scan_id=i + 1,
                         owner='nedbrainard', beamline_id='example',
                         beamline_config=insert_beamline_config({}, time=0.))
    header = db[3]
    scan_id = header.scan_id
    owner = header.owner
    assert_equal(scan_id, 3)
    # This should be the most *recent* Scan 3. There is ambiguity.
    assert_equal(owner, 'nedbrainard')
开发者ID:giltis,项目名称:dataportal,代码行数:15,代码来源:test_broker.py


示例4: test_scan_id_lookup

def test_scan_id_lookup():
    rd1 = [insert_run_start(time=float(i), scan_id=i + 1 + 314159,
                            owner='docbrown', beamline_id='example',
                            uid=str(uuid.uuid4())) for i in range(5)]

    rd2 = [insert_run_start(time=float(i)+1, scan_id=i + 1 + 314159,
                            owner='nedbrainard', beamline_id='example',
                            uid=str(uuid.uuid4())) for i in range(5)]
    header = db[3 + 314159]
    scan_id = header['start']['scan_id']
    assert scan_id == 3 + 314159
    assert rd2[2] == header['start']['uid']
    # This should be the most *recent* Scan 3 + 314159. There is ambiguity.
    owner = header['start']['owner']
    assert owner == 'nedbrainard'
开发者ID:tacaswell,项目名称:databroker,代码行数:15,代码来源:test_broker.py


示例5: test_event_queue

def test_event_queue():
    scan_id = np.random.randint(1e12)  # unique enough for government work
    rs = insert_run_start(time=0., scan_id=scan_id,
                          owner='queue-tester', beamline_id='example',
                          beamline_config=insert_beamline_config({}, time=0.))
    header = db.find_headers(scan_id=scan_id)
    queue = EventQueue(header)
    # Queue should be empty until we create Events.
    empty_bundle = queue.get()
    assert_equal(len(empty_bundle), 0)
    queue.update()
    empty_bundle = queue.get()
    assert_equal(len(empty_bundle), 0)
    events = temperature_ramp.run(rs)
    # This should add a bundle of Events to the queue.
    queue.update()
    first_bundle = queue.get()
    assert_equal(len(first_bundle), len(events))
    more_events = temperature_ramp.run(rs)
    # Queue should be empty until we update.
    empty_bundle = queue.get()
    assert_equal(len(empty_bundle), 0)
    queue.update()
    second_bundle = queue.get()
    assert_equal(len(second_bundle), len(more_events))
    # Add Events from a different example into the same Header.
    other_events = image_and_scalar.run(rs)
    queue.update()
    third_bundle = queue.get()
    assert_equal(len(third_bundle), len(other_events))
开发者ID:giltis,项目名称:dataportal,代码行数:30,代码来源:test_broker.py


示例6: test_replay_plotx_ploty

def test_replay_plotx_ploty():
    # insert a run header with one plotx and one ploty
    rs = mdsapi.insert_run_start(
        time=ttime.time(), beamline_id='replay testing', scan_id=1,
        custom={'plotx': 'Tsam', 'ploty': ['point_det']},
        beamline_config=mdsapi.insert_beamline_config({}, ttime.time()))
    temperature_ramp.run(rs)
    # plotting replay in live mode with plotx and ploty should have the
    # following state after a few seconds of execution:
    # replay.
    app = QtApplication()
    ui = replay.create(replay.define_live_params())
    ui.title = 'testing replay with plotx and one value of ploty'
    ui.show()
    app.timed_call(4000, app.stop)
    app.start()
    try:
        # the x axis should be 'plotx'
        assert ui.scalar_collection.x == 'Tsam'
        # there should only be 1 scalar model currently plotting
        assert len([scalar_model for scalar_model
                    in ui.scalar_collection.scalar_models.values()
                    if scalar_model.is_plotting]) == 1
        # the x axis should not be the index
        assert not ui.scalar_collection.x_is_index
    except AssertionError:
        # gotta destroy the app or it will cause cascading errors
        ui.close()
        app.destroy()
        raise

    ui.close()
    app.destroy()
开发者ID:NSLS-II,项目名称:replay,代码行数:33,代码来源:test_replay.py


示例7: setup_module

def setup_module(module):
    mds_setup()
    fs_setup()

    owners = ["docbrown", "nedbrainard"]
    num_entries = 5
    rs = insert_run_start(time=ttime.time(), scan_id=105, owner="stepper", beamline_id="example", uid=str(uuid.uuid4()))
    step_scan.run(run_start_uid=rs)
    for owner in owners:
        for i in range(num_entries):
            logger.debug("{}: {} of {}".format(owner, i + 1, num_entries))
            rs = insert_run_start(
                time=ttime.time(), scan_id=i + 1, owner=owner, beamline_id="example", uid=str(uuid.uuid4())
            )
            # insert some events into mds
            temperature_ramp.run(run_start_uid=rs, make_run_stop=(i != 0))
开发者ID:ericdill,项目名称:databroker,代码行数:16,代码来源:test_broker.py


示例8: test_configuration

def test_configuration():
    rs = insert_run_start(
        time=ttime.time(), scan_id=105, owner="stepper", beamline_id="example", uid=str(uuid.uuid4()), cat="meow"
    )
    step_scan.run(run_start_uid=rs)
    h = db[rs]
    # check that config is not included by default
    ev = next(get_events(h))
    assert set(ev["data"].keys()) == set(["Tsam", "point_det"])
    # find config in descriptor['configuration']
    ev = next(get_events(h, fields=["Tsam", "exposure_time"]))
    assert "exposure_time" in ev["data"]
    assert ev["data"]["exposure_time"] == 5
    assert "exposure_time" in ev["timestamps"]
    assert ev["timestamps"]["exposure_time"] == 0.0
    # find config in start doc
    ev = next(get_events(h, fields=["Tsam", "cat"]))
    assert "cat" in ev["data"]
    assert ev["data"]["cat"] == "meow"
    assert "cat" in ev["timestamps"]
    # find config in stop doc
    ev = next(get_events(h, fields=["Tsam", "exit_status"]))
    assert "exit_status" in ev["data"]
    assert ev["data"]["exit_status"] == "success"
    assert "exit_status" in ev["timestamps"]
开发者ID:ericdill,项目名称:databroker,代码行数:25,代码来源:test_broker.py


示例9: test_configuration

def test_configuration():
    rs = insert_run_start(time=ttime.time(), scan_id=105,
                          owner='stepper', beamline_id='example',
                          uid=str(uuid.uuid4()), cat='meow')
    step_scan.run(run_start_uid=rs)
    h = db[rs]
    # check that config is not included by default
    ev = next(get_events(h))
    assert set(ev['data'].keys()) == set(['Tsam', 'point_det'])
    # find config in descriptor['configuration']
    ev = next(get_events(h, fields=['Tsam', 'exposure_time']))
    assert 'exposure_time' in ev['data']
    assert ev['data']['exposure_time'] == 5
    assert 'exposure_time' in ev['timestamps']
    assert ev['timestamps']['exposure_time'] == 0.
    # find config in start doc
    ev = next(get_events(h, fields=['Tsam', 'cat']))
    assert 'cat' in ev['data']
    assert ev['data']['cat'] == 'meow'
    assert 'cat' in ev['timestamps']
    # find config in stop doc
    ev = next(get_events(h, fields=['Tsam', 'exit_status']))
    assert 'exit_status' in ev['data']
    assert ev['data']['exit_status'] == 'success'
    assert 'exit_status' in ev['timestamps']
开发者ID:tacaswell,项目名称:databroker,代码行数:25,代码来源:test_broker.py


示例10: setup

def setup():
    fs_setup()
    mds_setup()

    blc_uid = insert_beamline_config({}, ttime.time())
    rs_uid = insert_run_start(time=0.0, scan_id=1, owner='test',
                              beamline_id='test', beamline_config=blc_uid)
    temperature_ramp.run(run_start_uid=rs_uid)
开发者ID:giltis,项目名称:dataportal,代码行数:8,代码来源:test_muxer.py


示例11: setup

def setup():
    fs_setup()
    mds_setup()

    rs_uid = insert_run_start(time=0.0, scan_id=1, owner='test',
                              beamline_id='test',
                              uid=str(uuid.uuid4()))
    temperature_ramp.run(run_start_uid=rs_uid)
开发者ID:NSLS-II,项目名称:datamuxer,代码行数:8,代码来源:test_muxer.py


示例12: test_get_fields

def test_get_fields():
    rs = insert_run_start(time=ttime.time(), scan_id=105,
                          owner='stepper', beamline_id='example',
                          uid=str(uuid.uuid4()))
    step_scan.run(run_start_uid=rs)
    h = db[rs]
    actual = get_fields(h)
    assert actual == set(['Tsam', 'point_det'])
开发者ID:tacaswell,项目名称:databroker,代码行数:8,代码来源:test_broker.py


示例13: test_uid_lookup

def test_uid_lookup():
    uid = str(uuid.uuid4())
    uid2 = uid[0] + str(uuid.uuid4())[1:]  # same first character as uid
    rs1 = insert_run_start(time=100.0, scan_id=1, uid=uid, owner="drstrangelove", beamline_id="example")
    insert_run_start(time=100.0, scan_id=1, uid=uid2, owner="drstrangelove", beamline_id="example")
    # using full uid
    actual_uid = db[uid]["start"]["uid"]
    assert actual_uid == uid
    assert rs1 == uid

    # using first 6 chars
    actual_uid = db[uid[:6]]["start"]["uid"]
    assert actual_uid == uid
    assert rs1 == uid

    # using first char (will error)
    pytest.raises(ValueError, lambda: db[uid[0]])
开发者ID:ericdill,项目名称:databroker,代码行数:17,代码来源:test_broker.py


示例14: test_find_by_string_time

def test_find_by_string_time():
    uid = insert_run_start(
        time=ttime.time(), scan_id=1, owner="nedbrainard", beamline_id="example", uid=str(uuid.uuid4())
    )
    today = datetime.today()
    tomorrow = date.today() + timedelta(days=1)
    today_str = today.strftime("%Y-%m-%d")
    tomorrow_str = tomorrow.strftime("%Y-%m-%d")
    result = db(start_time=today_str, stop_time=tomorrow_str)
    assert uid in [hdr["start"]["uid"] for hdr in result]
开发者ID:ericdill,项目名称:databroker,代码行数:10,代码来源:test_broker.py


示例15: test_process

def test_process():
    rs = insert_run_start(time=ttime.time(), scan_id=105, owner="stepper", beamline_id="example", uid=str(uuid.uuid4()))
    step_scan.run(run_start_uid=rs)
    c = count()

    def f(name, doc):
        next(c)

    process(db[rs], f)
    assert next(c) == len(list(restream(db[rs])))
开发者ID:ericdill,项目名称:databroker,代码行数:10,代码来源:test_broker.py


示例16: test_uid_lookup

def test_uid_lookup():
    uid = str(uuid.uuid4())
    uid2 = uid[0] + str(uuid.uuid4())[1:]  # same first character as uid
    rs1 = insert_run_start(time=100., scan_id=1, uid=uid,
                           owner='drstrangelove', beamline_id='example')
    insert_run_start(time=100., scan_id=1, uid=uid2,
                     owner='drstrangelove', beamline_id='example')
    # using full uid
    actual_uid = db[uid]['start']['uid']
    assert_equal(actual_uid, uid)
    assert_equal(rs1, uid)

    # using first 6 chars
    actual_uid = db[uid[:6]]['start']['uid']
    assert_equal(actual_uid, uid)
    assert_equal(rs1, uid)

    # using first char (will error)
    assert_raises(ValueError, lambda: db[uid[0]])
开发者ID:danielballan,项目名称:databroker,代码行数:19,代码来源:test_broker.py


示例17: test_find_by_string_time

def test_find_by_string_time():
    uid = insert_run_start(time=ttime.time(), scan_id=1,
                           owner='nedbrainard', beamline_id='example',
                           uid=str(uuid.uuid4()))
    today = datetime.today()
    tomorrow = date.today() + timedelta(days=1)
    today_str = today.strftime('%Y-%m-%d')
    tomorrow_str = tomorrow.strftime('%Y-%m-%d')
    result = db(start_time=today_str, stop_time=tomorrow_str)
    assert uid in [hdr['start']['uid'] for hdr in result]
开发者ID:tacaswell,项目名称:databroker,代码行数:10,代码来源:test_broker.py


示例18: test_indexing

def test_indexing():
    for i in range(5):
        insert_run_start(time=float(i), scan_id=i + 1,
                         owner='nedbrainard', beamline_id='example',
                         uid=str(uuid.uuid4()))

    header = db[-1]
    assert not isinstance(header, list)
    assert header['start']['scan_id'] == 5

    header = db[-2]
    assert not isinstance(header, list)
    assert header['start']['scan_id'] == 4

    f = lambda: db[-100000]
    pytest.raises(IndexError, f)

    headers = db[-5:]
    assert isinstance(headers, list)
    assert len(headers) == 5

    headers = db[-6:]
    assert isinstance(headers, list)
    assert len(headers) == 6

    headers = db[-1:]
    assert isinstance(headers, list)
    assert len(headers) == 1
    assert headers[0]['start']['scan_id'] == 5

    headers = db[-2:-1]
    assert isinstance(headers, list)
    assert len(headers) == 1
    header, = headers
    assert header['start']['scan_id'] == 4

    assert [h['start']['scan_id'] for h in db[-3:-1]] == [4, 3]

    # fancy indexing, by location
    assert [h['start']['scan_id'] for h in db[[-3, -1, -2]]] == [3, 5, 4]

    # fancy indexing, by scan id
    assert [h['start']['scan_id'] for h in db[[3, 1, 2]]] == [3, 1, 2]
开发者ID:tacaswell,项目名称:databroker,代码行数:43,代码来源:test_broker.py


示例19: test_uid_lookup

def test_uid_lookup():
    uid = str(uuid.uuid4())
    uid2 = uid[0] + str(uuid.uuid4())[1:]  # same first character as uid
    rs1 = insert_run_start(time=100., scan_id=1, uid=uid,
                           owner='drstrangelove', beamline_id='example',
                           beamline_config=insert_beamline_config({}, time=0.))
    rs2 = insert_run_start(time=100., scan_id=1, uid=uid2,
                           owner='drstrangelove', beamline_id='example',
                           beamline_config=insert_beamline_config({}, time=0.))
    # using full uid
    actual_uid = db[uid].run_start_uid
    assert_equal(actual_uid, uid)

    # using first 6 chars
    actual_uid = db[uid[:6]].run_start_uid
    assert_equal(actual_uid, uid)

    # using first char (will error)
    f = lambda: db[uid[0]]
    assert_raises(ValueError, f)
开发者ID:giltis,项目名称:dataportal,代码行数:20,代码来源:test_broker.py


示例20: test_basic_usage

def test_basic_usage():
    for i in range(5):
        insert_run_start(time=float(i), scan_id=i + 1,
                         owner='nedbrainard', beamline_id='example',
                         beamline_config=insert_beamline_config({}, time=0.))
    header_1 = db[-1]
    # Exercise reprs.
    header_1._repr_html_()
    repr(header_1)
    str(header_1)
    headers = db[-3:]
    headers._repr_html_()
    repr(headers)
    str(headers)

    header_ned = db.find_headers(owner='nedbrainard')
    header_null = db.find_headers(owner='this owner does not exist')
    events_1 = db.fetch_events(header_1)
    events_ned = db.fetch_events(header_ned)
    events_null = db.fetch_events(header_null)
开发者ID:giltis,项目名称:dataportal,代码行数:20,代码来源:test_broker.py



注:本文中的metadatastore.api.insert_run_start函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python metainfo.info函数代码示例发布时间:2022-05-27
下一篇:
Python metadata.get_result函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap