• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_utils.do_job_ingestion函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.test_utils.do_job_ingestion函数的典型用法代码示例。如果您正苦于以下问题:Python do_job_ingestion函数的具体用法?Python do_job_ingestion怎么用?Python do_job_ingestion使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了do_job_ingestion函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_cycle_one_job

def test_cycle_one_job(jm, refdata, sample_data, initial_data, sample_resultset, mock_log_parser):
    """
    Test cycling one job in a group of jobs to confirm there are no
    unexpected deletions
    """

    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)

    time_now = time.time()
    cycle_date_ts = int(time_now - 7 * 24 * 3600)

    jm.execute(proc="jobs_test.updates.set_result_sets_push_timestamp", placeholders=[time_now])

    jm.execute(proc="jobs_test.updates.set_one_result_set_push_timestamp", placeholders=[cycle_date_ts])

    jobs_to_be_deleted = jm.execute(proc="jobs_test.selects.get_result_set_jobs", placeholders=[1])

    jobs_before = jm.execute(proc="jobs_test.selects.jobs")

    call_command("cycle_data", sleep_time=0, cycle_interval=1, debug=True)

    jobs_after = jm.execute(proc="jobs_test.selects.jobs")

    # Confirm that the target result set has no jobs in the
    # jobs table
    jobs_to_be_deleted_after = jm.execute(proc="jobs_test.selects.get_result_set_jobs", placeholders=[1])

    assert len(jobs_to_be_deleted_after) == 0

    assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
开发者ID:EdgarChen,项目名称:treeherder,代码行数:31,代码来源:test_jobs_model.py


示例2: test_ingest_all_sample_jobs

def test_ingest_all_sample_jobs(jm, sample_data,
                                sample_resultset, test_repository, mock_log_parser):
    """
    Process each job structure in the job_data.txt file and verify.
    """
    job_data = sample_data.job_data
    test_utils.do_job_ingestion(jm, job_data, sample_resultset)
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:7,代码来源:test_jobs_model.py


示例3: test_cycle_all_data_in_chunks

def test_cycle_all_data_in_chunks(test_repository, failure_classifications, sample_data,
                                  sample_resultset, mock_log_parser):
    """
    Test cycling the sample data in chunks.
    """
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(test_repository, job_data, sample_resultset, False)

    # build a date that will cause the data to be cycled
    cycle_date_ts = datetime.datetime.now() - datetime.timedelta(weeks=1)
    for job in Job.objects.all():
        job.submit_time = cycle_date_ts
        job.save()

    create_failure_lines(Job.objects.get(id=1),
                         [(test_line, {})] * 7)

    assert TestFailureLine.search().count() > 0

    call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
    refresh_all()

    # There should be no jobs after cycling
    assert Job.objects.count() == 0
    assert FailureLine.objects.count() == 0
    assert JobDetail.objects.count() == 0
    assert TestFailureLine.search().count() == 0
开发者ID:kmoir,项目名称:treeherder,代码行数:27,代码来源:test_cycle_data.py


示例4: test_cycle_job_model_reference_data

def test_cycle_job_model_reference_data(test_repository, failure_classifications,
                                        sample_data, sample_resultset,
                                        mock_log_parser):
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(test_repository, job_data, sample_resultset, False)

    # get a list of ids of original reference data
    original_job_type_ids = JobType.objects.values_list('id', flat=True)
    original_job_group_ids = JobGroup.objects.values_list('id', flat=True)
    original_machine_ids = Machine.objects.values_list('id', flat=True)

    # create a bunch of job model data that should be cycled, since they don't
    # reference any current jobs
    jg = JobGroup.objects.create(symbol='moo', name='moo')
    jt = JobType.objects.create(job_group=jg, symbol='mu', name='mu')
    m = Machine.objects.create(name='machine_with_no_job')
    (jg_id, jt_id, m_id) = (jg.id, jt.id, m.id)
    call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)

    # assert that reference data that should have been cycled, was cycled
    assert JobGroup.objects.filter(id=jg_id).count() == 0
    assert JobType.objects.filter(id=jt_id).count() == 0
    assert Machine.objects.filter(id=m_id).count() == 0

    # assert that we still have everything that shouldn't have been cycled
    assert JobType.objects.filter(id__in=original_job_type_ids).count() == len(original_job_type_ids)
    assert JobGroup.objects.filter(id__in=original_job_group_ids).count() == len(original_job_group_ids)
    assert Machine.objects.filter(id__in=original_machine_ids).count() == len(original_machine_ids)
开发者ID:kmoir,项目名称:treeherder,代码行数:28,代码来源:test_cycle_data.py


示例5: test_cycle_all_data

def test_cycle_all_data(jm, refdata, sample_data, initial_data,
                        sample_resultset, mock_log_parser):
    """
    Test cycling the sample data
    """
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)

    time_now = time.time()
    cycle_date_ts = time_now - 7 * 24 * 3600

    jm.execute(
        proc="jobs_test.updates.set_result_sets_push_timestamp",
        placeholders=[cycle_date_ts]
    )

    jobs_to_be_deleted = jm.execute(
        proc="jobs_test.selects.get_jobs_for_cycling",
        placeholders=[time_now - 24 * 3600]
    )

    jobs_before = jm.execute(proc="jobs_test.selects.jobs")

    call_command('cycle_data', sleep_time=0, cycle_interval=1)

    jobs_after = jm.execute(proc="jobs_test.selects.jobs")

    assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)

    # There should be no jobs after cycling
    assert len(jobs_after) == 0
开发者ID:mjzffr,项目名称:treeherder,代码行数:31,代码来源:test_jobs_model.py


示例6: test_cycle_all_data

def test_cycle_all_data(jm, sample_data,
                        sample_resultset, test_repository, mock_log_parser,
                        failure_lines):
    """
    Test cycling the sample data
    """
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(jm, job_data, sample_resultset, False)

    time_now = time.time()
    cycle_date_ts = time_now - 7 * 24 * 3600

    jm.execute(
        proc="jobs_test.updates.set_jobs_last_modified",
        placeholders=[cycle_date_ts]
    )

    jobs_to_be_deleted = jm.execute(
        proc="jobs_test.selects.get_jobs_for_cycling",
        placeholders=[time_now - 24 * 3600]
    )

    jobs_before = jm.execute(proc="jobs_test.selects.jobs")

    call_command('cycle_data', sleep_time=0, days=1)

    jobs_after = jm.execute(proc="jobs_test.selects.jobs")

    assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)

    # There should be no jobs or failure lines after cycling
    assert len(jobs_after) == 0
    assert FailureLine.objects.count() == 0
    assert Job.objects.count() == 0
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:34,代码来源:test_jobs_model.py


示例7: test_cycle_all_data

def test_cycle_all_data(test_repository, failure_classifications, sample_data,
                        sample_resultset, mock_log_parser, failure_lines):
    """
    Test cycling the sample data
    """
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(test_repository, job_data, sample_resultset, False)

    # set the submit time to be a week before today
    cycle_date_ts = datetime.datetime.now() - datetime.timedelta(weeks=1)
    for job in Job.objects.all():
        job.submit_time = cycle_date_ts
        job.save()

    call_command('cycle_data', sleep_time=0, days=1)

    refresh_all()

    # There should be no jobs or failure lines after cycling
    assert Job.objects.count() == 0
    assert FailureLine.objects.count() == 0
    assert JobDetail.objects.count() == 0
    assert JobLog.objects.count() == 0

    # There should be nothing in elastic search after cycling
    assert TestFailureLine.search().count() == 0
开发者ID:kmoir,项目名称:treeherder,代码行数:26,代码来源:test_cycle_data.py


示例8: test_ingest_all_sample_jobs

def test_ingest_all_sample_jobs(test_repository, failure_classifications,
                                sample_data, sample_push, mock_log_parser):
    """
    Process each job structure in the job_data.txt file and verify.
    """
    job_data = sample_data.job_data
    test_utils.do_job_ingestion(test_repository, job_data, sample_push)
开发者ID:edmorley,项目名称:treeherder,代码行数:7,代码来源:test_job_ingestion.py


示例9: test_cycle_all_data_in_chunks

def test_cycle_all_data_in_chunks(jm, refdata, sample_data, initial_data, sample_resultset, mock_log_parser):
    """
    Test cycling the sample data in chunks.
    """
    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)

    # build a date that will cause the data to be cycled
    time_now = time.time()
    cycle_date_ts = int(time_now - 7 * 24 * 3600)

    jm.execute(proc="jobs_test.updates.set_result_sets_push_timestamp", placeholders=[cycle_date_ts])

    jobs_to_be_deleted = jm.execute(proc="jobs_test.selects.get_jobs_for_cycling", placeholders=[time_now - 24 * 3600])

    jobs_before = jm.execute(proc="jobs_test.selects.jobs")

    call_command("cycle_data", sleep_time=0, cycle_interval=1, chunk_size=3)

    jobs_after = jm.execute(proc="jobs_test.selects.jobs")

    assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)

    # There should be no jobs after cycling
    assert len(jobs_after) == 0
开发者ID:EdgarChen,项目名称:treeherder,代码行数:25,代码来源:test_jobs_model.py


示例10: test_ingest_single_sample_job

def test_ingest_single_sample_job(jm, refdata, sample_data, initial_data, mock_log_parser, sample_resultset):
    """Process a single job structure in the job_data.txt file"""
    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)

    jm.disconnect()
    refdata.disconnect()
开发者ID:EdgarChen,项目名称:treeherder,代码行数:7,代码来源:test_jobs_model.py


示例11: test_ingesting_skip_existing

def test_ingesting_skip_existing(test_repository, failure_classifications, sample_data,
                                 sample_push, mock_log_parser):
    """Remove single existing job prior to loading"""
    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(test_repository, job_data, sample_push)

    store_job_data(test_repository, sample_data.job_data[:2])

    assert Job.objects.count() == 2
开发者ID:edmorley,项目名称:treeherder,代码行数:9,代码来源:test_job_ingestion.py


示例12: test_cycle_one_job

def test_cycle_one_job(jm, sample_data,
                       sample_resultset, test_repository, mock_log_parser,
                       failure_lines):
    """
    Test cycling one job in a group of jobs to confirm there are no
    unexpected deletions
    """

    job_data = sample_data.job_data[:20]
    test_utils.do_job_ingestion(jm, job_data, sample_resultset, False)

    job_not_deleted = jm.get_job(2)[0]

    failure_lines_remaining = create_failure_lines(test_repository,
                                                   job_not_deleted["job_guid"],
                                                   [(test_line, {}),
                                                    (test_line, {"subtest": "subtest2"})])

    time_now = time.time()
    cycle_date_ts = int(time_now - 7 * 24 * 3600)

    jm.execute(
        proc="jobs_test.updates.set_jobs_last_modified",
        placeholders=[time_now]
    )

    jm.execute(
        proc="jobs_test.updates.set_one_job_last_modified_timestamp",
        placeholders=[cycle_date_ts]
    )

    jobs_to_be_deleted = jm.execute(
        proc="jobs_test.selects.get_one_job_for_cycling",
        placeholders=[1]
    )

    jobs_before = jm.execute(proc="jobs_test.selects.jobs")

    call_command('cycle_data', sleep_time=0, days=1, debug=True)

    jobs_after = jm.execute(proc="jobs_test.selects.jobs")

    # Confirm that the target result set has no jobs in the
    # jobs table
    jobs_to_be_deleted_after = jm.execute(
        proc="jobs_test.selects.get_one_job_for_cycling",
        placeholders=[1]
    )

    assert len(jobs_to_be_deleted_after) == 0

    assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
    assert len(jobs_after) == Job.objects.count()

    assert (set(item.id for item in FailureLine.objects.all()) ==
            set(item.id for item in failure_lines_remaining))
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:56,代码来源:test_jobs_model.py


示例13: test_get_job_data

def test_get_job_data(jm, test_project, refdata, sample_data, initial_data, mock_log_parser, sample_resultset):

    target_len = 10
    job_data = sample_data.job_data[:target_len]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)

    with ArtifactsModel(test_project) as artifacts_model:
        job_data = artifacts_model.get_job_signatures_from_ids(range(1, 11))

    assert len(job_data) is target_len
开发者ID:EdgarChen,项目名称:treeherder,代码行数:10,代码来源:test_jobs_model.py


示例14: test_get_job_data

def test_get_job_data(jm, refdata, sample_data, initial_data,
                                  mock_log_parser, sample_resultset):

    target_len = 10
    job_data = sample_data.job_data[:target_len]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)

    job_data = jm.get_job_signatures_from_ids(range(1,11))

    assert len(job_data) is target_len
开发者ID:AutomatedTester,项目名称:treeherder-service,代码行数:10,代码来源:test_jobs_model.py


示例15: test_ingesting_skip_existing

def test_ingesting_skip_existing(jm, sample_data, initial_data, refdata, mock_log_parser, sample_resultset):
    """Remove single existing job prior to loading"""

    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)

    jm.store_job_data(sample_data.job_data[:2])

    jl = jm.get_job_list(0, 10)
    assert len(jl) == 2
开发者ID:EdgarChen,项目名称:treeherder,代码行数:10,代码来源:test_jobs_model.py


示例16: test_remove_existing_jobs_one_existing_one_new

def test_remove_existing_jobs_one_existing_one_new(jm, sample_data, initial_data, refdata,
                                                   mock_log_parser, sample_resultset):
    """Remove single existing job prior to loading"""

    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)

    data = jm._remove_existing_jobs(sample_data.job_data[:2])

    assert len(data) == 1
开发者ID:mjzffr,项目名称:treeherder,代码行数:10,代码来源:test_jobs_model.py


示例17: test_ingest_all_sample_jobs

def test_ingest_all_sample_jobs(jm, refdata, sample_data, initial_data, sample_resultset, mock_log_parser):
    """
    @@@ - Re-enable when our job_data.txt has been re-created with
          correct data.

    Process each job structure in the job_data.txt file and verify.

    """
    job_data = sample_data.job_data
    test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)
开发者ID:un33k,项目名称:treeherder-service,代码行数:10,代码来源:test_jobs_model.py


示例18: test_remove_existing_jobs_single_existing

def test_remove_existing_jobs_single_existing(test_repository, failure_classifications,
                                              sample_data, sample_resultset,
                                              mock_log_parser):
    """Remove single existing job prior to loading"""

    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(test_repository, job_data, sample_resultset)
    assert Job.objects.count() == 1

    data = _remove_existing_jobs(job_data)
    assert len(data) == 0
开发者ID:SebastinSanty,项目名称:treeherder,代码行数:11,代码来源:test_job_ingestion.py


示例19: test_remove_existing_jobs_one_existing_one_new

def test_remove_existing_jobs_one_existing_one_new(jm, sample_data,
                                                   sample_resultset, mock_log_parser):
    """Remove single existing job prior to loading"""

    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(jm, job_data, sample_resultset)

    data = jm._remove_existing_jobs(sample_data.job_data[:2])

    assert len(data) == 1
    assert Job.objects.count() == 1
开发者ID:jamesthechamp,项目名称:treeherder,代码行数:11,代码来源:test_jobs_model.py


示例20: test_ingest_single_sample_job

def test_ingest_single_sample_job(test_repository, failure_classifications,
                                  sample_data, sample_push,
                                  mock_log_parser):
    """Process a single job structure in the job_data.txt file"""
    job_data = sample_data.job_data[:1]
    test_utils.do_job_ingestion(test_repository, job_data, sample_push)
    assert Job.objects.count() == 1
    job = Job.objects.get(id=1)
    # Ensure we don't inadvertently change the way we generate job-related hashes.
    assert job.option_collection_hash == '32faaecac742100f7753f0c1d0aa0add01b4046b'
    assert job.signature.signature == '4dabe44cc898e585228c43ea21337a9b00f5ddf7'
开发者ID:edmorley,项目名称:treeherder,代码行数:11,代码来源:test_job_ingestion.py



注:本文中的tests.test_utils.do_job_ingestion函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_utils.make_metric函数代码示例发布时间:2022-05-27
下一篇:
Python test_utils.create_script_param_config函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap