• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python helpers.demo_file函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.utils.helpers.demo_file函数的典型用法代码示例。如果您正苦于以下问题:Python demo_file函数的具体用法?Python demo_file怎么用?Python demo_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了demo_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_uhs_output_type_xml

    def test_uhs_output_type_xml(self):
        # Run a calculation with --output-type=xml and check that the expected
        # result files are created in the right location.

        # This location is based on parameters in the UHS config file:
        results_target_dir = demo_file('uhs/computed_output')

        # clear the target dir from previous demo/test runs
        shutil.rmtree(results_target_dir)

        expected_export_files = [
            os.path.join(results_target_dir, 'uhs_poe:0.1.hdf5'),
            os.path.join(results_target_dir, 'uhs_poe:0.02.hdf5'),
            os.path.join(results_target_dir, 'uhs.xml'),
        ]

        for f in expected_export_files:
            self.assertFalse(os.path.exists(f))

        uhs_cfg = demo_file('uhs/config.gem')
        try:
            ret_code = run_job(uhs_cfg, ['--output-type=xml'])
            self.assertEqual(0, ret_code)

            # Check that all the output files were created:
            for f in expected_export_files:
                self.assertTrue(os.path.exists(f))
        finally:
            shutil.rmtree(results_target_dir)
开发者ID:PseudononymousEpistle,项目名称:oq-engine,代码行数:29,代码来源:uhs_test.py


示例2: test_run_calc_with_description

    def test_run_calc_with_description(self):
        # Test importing and running a job with a config containing the
        # optional DESCRIPTION parameter.

        description = 'Classical PSHA hazard test description'

        orig_cfg_path = demo_file('PeerTestSet1Case2/config.gem')
        mod_cfg_path = os.path.join(demo_file('PeerTestSet1Case2'),
                                    'modified_config.gem')

        # Use ConfigParser to add the DESCRIPTION param to an existing config
        # profile and write a new temporary config file:
        cfg_parser = ConfigParser.ConfigParser()
        cfg_parser.readfp(open(orig_cfg_path, 'r'))
        cfg_parser.set('general', 'DESCRIPTION', description)
        cfg_parser.write(open(mod_cfg_path, 'w'))

        run_job(mod_cfg_path)
        job = OqJob.objects.latest('id')
        job_profile = job.profile()

        self.assertEqual(description, job_profile.description)
        self.assertEqual(description, job.description)

        # Clean up the temporary config file:
        os.unlink(mod_cfg_path)
开发者ID:PseudononymousEpistle,项目名称:oq-engine,代码行数:26,代码来源:export_test.py


示例3: test_scenario_risk_sample_based

    def test_scenario_risk_sample_based(self):
        # This QA test is a longer-running test of the Scenario Risk
        # calculator.

        # The vulnerabiilty model has non-zero Coefficients of Variation and
        # therefore exercises the 'sample-based' path through the calculator.
        # This test is configured to produce 1000 ground motion fields at each
        # location of interest (in the test above, only 10 are produced).

        # Since we're seeding the random epsilon sampling, we can consistently
        # reproduce all result values.

        # When these values are compared to the results computed by a similar
        # config which takes the 'mean-based' path (with CoVs = 0), we expect
        # the following:
        # All of the mean values in the 'sample-based' results should be with
        # 5%, + or -, of the 'mean-based' results.
        # The standard deviation values of the 'sample-based' results should
        # simply be greater than those produced with the 'mean-based' method.

        # For comparison, mean and stddev values for the region were computed
        # with 1000 GMFs using the mean-based approach. These values (rounded
        # to 2 decimal places) are:
        mb_mean_loss = 1233.26
        mb_stddev_loss = 443.63
        # Loss map for the mean-based approach:
        mb_loss_map = [
            dict(asset='a3', pos='15.48 38.25', mean=200.54874638,
                stddev=94.2302991022),
            dict(asset='a2', pos='15.56 38.17', mean=510.821363253,
                stddev=259.964152622),
            dict(asset='a1', pos='15.48 38.09', mean=521.885458891,
                stddev=244.825980356),
        ]

        # Sanity checks are done. Let's do this.
        scen_cfg = helpers.demo_file(
            'scenario_risk/config_sample-based_qa.gem')
        result = helpers.run_job(scen_cfg, ['--output-type=xml'],
            check_output=True)

        job = OqJob.objects.latest('id')
        self.assertEqual('succeeded', job.status)

        expected_loss_map_file = helpers.demo_file(
            'scenario_risk/computed_output/loss-map-%s.xml' % job.id)
        self.assertTrue(os.path.exists(expected_loss_map_file))

        loss_map = helpers.loss_map_result_from_file(expected_loss_map_file)
        self._verify_loss_map_within_range(sorted(mb_loss_map),
            sorted(loss_map), 0.05)

        exp_mean_loss, exp_stddev_loss = helpers.mean_stddev_from_result_line(
            result)
        self.assertAlmostEqual(mb_mean_loss, exp_mean_loss,
            delta=mb_mean_loss * 0.05)
        self.assertTrue(exp_stddev_loss > mb_stddev_loss)
开发者ID:PseudononymousEpistle,项目名称:oq-engine,代码行数:57,代码来源:scenario_risk_test.py


示例4: _create_job_profiles

    def _create_job_profiles(self, user_name):
        uhs_cfg = helpers.demo_file('uhs/config.gem')
        job = engine.prepare_job()
        self.uhs_jp, _, _ = engine.import_job_profile(uhs_cfg, job,
                                               user_name=user_name)

        cpsha_cfg = helpers.demo_file('classical_psha_based_risk/config.gem')
        job = engine.prepare_job()
        self.cpsha_jp, _, _ = engine.import_job_profile(cpsha_cfg, job,
                                                 user_name=user_name)
开发者ID:PseudononymousEpistle,项目名称:oq-engine,代码行数:10,代码来源:core_test.py


示例5: setUp

    def setUp(self):
        self.job, _ = helpers.get_risk_job(
            demo_file('event_based_risk/job.ini'),
            demo_file('event_based_hazard/job.ini'), output_type="gmf")

        self.calculator = event_based.EventBasedRiskCalculator(self.job)
        self.calculator.pre_execute()

        self.job.is_running = True
        self.job.status = 'executing'
        self.job.save()
开发者ID:xpb,项目名称:oq-engine,代码行数:11,代码来源:core_test.py


示例6: test_a_few_inputs

    def test_a_few_inputs(self):
        job, files = helpers.get_risk_job(
            demo_file('classical_psha_based_risk/job.ini'),
            demo_file('simple_fault_demo_hazard/job.ini'))
        rc = job.risk_calculation

        expected_ids = sorted([x.id for x in files.values()])

        inputs = models.inputs4rcalc(rc.id)

        actual_ids = sorted([x.id for x in inputs])

        self.assertEqual(expected_ids, actual_ids)
开发者ID:xpb,项目名称:oq-engine,代码行数:13,代码来源:models_test.py


示例7: setUp

 def setUp(self):
     job, _ = helpers.get_risk_job(
         demo_file('classical_psha_based_risk/job.ini'),
         demo_file('simple_fault_demo_hazard/job.ini')
     )
     self.compulsory_arguments = dict(
         calculation_mode="classical",
         lrem_steps_per_interval=5)
     self.other_args = dict(
         owner=helpers.default_user(),
         region_constraint=(
             'POLYGON((-122.0 38.113, -122.114 38.113, -122.57 38.111, '
             '-122.0 38.113))'),
         hazard_output=job.risk_calculation.hazard_output)
开发者ID:xpb,项目名称:oq-engine,代码行数:14,代码来源:validation_test.py


示例8: test_hazard_map_test

    def test_hazard_map_test(self):
        helpers.run_job(helpers.demo_file(
            os.path.join("HazardMapTest", "config.gem")))

        self.job = models.OqCalculation.objects.latest("id")

        path = helpers.demo_file(os.path.join("HazardMapTest",
            "expected_results", "meanHazardMap0.1.dat"))
        expected_map = load_expected_map(path)

        poe = 0.1
        statistic_type = "mean"
        verify_hazmap_results(self, self.job, expected_map, poe,
                              statistic_type)
开发者ID:kpanic,项目名称:openquake,代码行数:14,代码来源:classical_psha_unittest.py


示例9: test_with_input_type

    def test_with_input_type(self):
        job, files = helpers.get_risk_job(
            demo_file('classical_psha_based_risk/job.ini'),
            demo_file('simple_fault_demo_hazard/job.ini'))
        rc = job.risk_calculation

        # It should only be 1 id, actually.
        expected_ids = [x.id for x in files.values()
                        if x.input_type == 'exposure']

        inputs = models.inputs4rcalc(rc.id, input_type='exposure')

        actual_ids = sorted([x.id for x in inputs])

        self.assertEqual(expected_ids, actual_ids)
开发者ID:xpb,项目名称:oq-engine,代码行数:15,代码来源:models_test.py


示例10: test_calculator_for_task

    def test_calculator_for_task(self):
        """Load up a sample calculation (into the db and cache) and make sure
        we can instantiate the correct calculator for a given calculation id.
        """
        from openquake.calculators.hazard.classical.core import (
            ClassicalHazardCalculator)
        job_profile, params, sections = engine.import_job_profile(demo_file(
            'simple_fault_demo_hazard/config.gem'))

        calculation = OqCalculation(owner=job_profile.owner,
                                    oq_job_profile=job_profile)
        calculation.save()

        calc_proxy = engine.CalculationProxy(params, calculation.id,
                                             oq_job_profile=job_profile,
                                             oq_calculation=calculation)
        calc_proxy.to_kvs()

        with patch(
            'openquake.utils.tasks.get_running_calculation') as grc_mock:

            # Loading of the CalculationProxy is done by
            # `get_running_calculation`, which is covered by other tests.
            # So, we just want to make sure that it's called here.
            grc_mock.return_value = calc_proxy

            calculator = tasks.calculator_for_task(calculation.id, 'hazard')

            self.assertTrue(isinstance(calculator, ClassicalHazardCalculator))
            self.assertEqual(1, grc_mock.call_count)
开发者ID:kpanic,项目名称:openquake,代码行数:30,代码来源:utils_tasks_unittest.py


示例11: test_compute_uhs_with_site_model

    def test_compute_uhs_with_site_model(self):
        the_job = helpers.prepare_job_context(
            helpers.demo_file('uhs/config_with_site_model.gem'))
        the_job.to_kvs()

        site = Site(0, 0)

        helpers.store_hazard_logic_trees(the_job)

        get_sm_patch = helpers.patch(
            'openquake.calculators.hazard.general.get_site_model')
        get_closest_patch = helpers.patch(
            'openquake.calculators.hazard.general.get_closest_site_model_data')
        compute_patch = helpers.patch(
            'openquake.calculators.hazard.uhs.core._compute_uhs')

        get_sm_mock = get_sm_patch.start()
        get_closest_mock = get_closest_patch.start()
        compute_mock = compute_patch.start()

        get_closest_mock.return_value = SiteModel(
            vs30=800, vs30_type='measured', z1pt0=100, z2pt5=200)
        try:
            compute_uhs(the_job, site)

            self.assertEqual(1, get_sm_mock.call_count)
            self.assertEqual(1, get_closest_mock.call_count)
            self.assertEqual(1, compute_mock.call_count)
        finally:
            get_sm_patch.stop()
            get_closest_patch.stop()
            compute_patch.stop()
开发者ID:ROB-Seismology,项目名称:oq-engine,代码行数:32,代码来源:core_test.py


示例12: test_complex_fault_demo_hazard_nrml_written_once

    def test_complex_fault_demo_hazard_nrml_written_once(self):
        """
        Run the `complex_fault_demo_hazard` demo and verify that the
        NRML files are written only once.
        """

        def filter_multi():
            """Filter and return files that were written more than once."""
            counts = defaultdict(int)
            files = stats.kvs_op("lrange", key, 0, -1)
            for file in files:
                counts[file] += 1
            return [(f, c) for f, c in counts.iteritems() if c > 1]

        job_cfg = helpers.demo_file(os.path.join(
            "complex_fault_demo_hazard", "config.gem"))

        helpers.run_job(job_cfg, output="xml")

        self.job = models.OqCalculation.objects.latest("id")

        key = stats.key_name(
            self.job.id, *stats.STATS_KEYS["hcls_xmlcurvewrites"])
        if key:
            multi_writes = filter_multi()
            self.assertFalse(multi_writes, str(multi_writes))
        key = stats.key_name(
            self.job.id, *stats.STATS_KEYS["hcls_xmlmapwrites"])
        if key:
            multi_writes = filter_multi()
            self.assertFalse(multi_writes, str(multi_writes))
开发者ID:kpanic,项目名称:openquake,代码行数:31,代码来源:classical_psha_unittest.py


示例13: test_bcr_risk_export

    def test_bcr_risk_export(self):
        # Tests that outputs of a risk classical calculation are
        # exported

        target_dir = tempfile.mkdtemp()

        try:
            cfg = helpers.demo_file('classical_bcr/job.ini')

            # run the calculation to create something to export
            retcode = helpers.run_risk_job_sp(cfg, self.hazard_id,
                                              silence=True)
            self.assertEqual(0, retcode)

            job = models.OqJob.objects.latest('id')

            outputs = export_core.get_outputs(job.id)
            expected_outputs = 1  # 1 bcr distribution
            self.assertEqual(expected_outputs, len(outputs))

            # Export the loss curves:
            distribution = outputs.filter(output_type='bcr_distribution')[0]
            rc_files = risk.export(distribution.id, target_dir)

            self.assertEqual(1, len(rc_files))

            for f in rc_files:
                self._test_exported_file(f)
        finally:
            shutil.rmtree(target_dir)
开发者ID:gvallarelli,项目名称:oq-engine,代码行数:30,代码来源:risk_test.py


示例14: test_peer_test_set_1_case_5

    def test_peer_test_set_1_case_5(self):
        expected_results = load_exp_hazcurve_results("PeerTestSet1Case5")

        helpers.run_job(helpers.demo_file(
            os.path.join("PeerTestSet1Case5", "config.gem")))

        self._assert_hazcurve_results_are(expected_results)
开发者ID:kpanic,项目名称:openquake,代码行数:7,代码来源:classical_psha_unittest.py


示例15: test_initialize_site_model

    def test_initialize_site_model(self):
        # we need a slightly different config file for this test
        cfg = helpers.demo_file(
            'simple_fault_demo_hazard/job_with_site_model.ini')
        self.job = helpers.get_hazard_job(cfg)
        self.calc = core.ClassicalHazardCalculator(self.job)

        self.calc.initialize_site_model()
        # If the site model isn't valid for the calculation geometry, a
        # `RuntimeError` should be raised here

        # Okay, it's all good. Now check the count of the site model records.
        [site_model_inp] = models.inputs4hcalc(
            self.job.hazard_calculation.id, input_type='site_model')
        sm_nodes = models.SiteModel.objects.filter(input=site_model_inp)

        self.assertEqual(2601, len(sm_nodes))

        num_pts_to_compute = len(
            self.job.hazard_calculation.points_to_compute())

        [site_data] = models.SiteData.objects.filter(
            hazard_calculation=self.job.hazard_calculation.id)

        # The site model is good. Now test that `site_data` was computed.
        # For now, just test the lengths of the site data collections:
        self.assertEqual(num_pts_to_compute, len(site_data.lons))
        self.assertEqual(num_pts_to_compute, len(site_data.lats))
        self.assertEqual(num_pts_to_compute, len(site_data.vs30s))
        self.assertEqual(num_pts_to_compute, len(site_data.vs30_measured))
        self.assertEqual(num_pts_to_compute, len(site_data.z1pt0s))
        self.assertEqual(num_pts_to_compute, len(site_data.z2pt5s))
开发者ID:4x,项目名称:oq-engine,代码行数:32,代码来源:core_test.py


示例16: test_export_for_scenario

    def test_export_for_scenario(self):
        target_dir = tempfile.mkdtemp()

        try:
            cfg = helpers.demo_file('scenario_hazard/job.ini')

            # run the calculation to create something to export
            retcode = helpers.run_hazard_job_sp(cfg, silence=True)
            self.assertEqual(0, retcode)

            job = models.OqJob.objects.latest('id')

            outputs = export_core.get_outputs(job.id)

            self.assertEqual(1, len(outputs))  # 1 GMF

            gmf_outputs = outputs.filter(output_type='gmf_scenario')
            self.assertEqual(1, len(gmf_outputs))

            exported_files = hazard.export(gmf_outputs[0].id, target_dir)

            self.assertEqual(1, len(exported_files))
            # Check the file paths exist, is absolute, and the file isn't
            # empty.
            f = exported_files[0]
            self._test_exported_file(f)

            # Check for the correct number of GMFs in the file:
            tree = etree.parse(f)
            self.assertEqual(10, number_of('nrml:gmf', tree))
        finally:
            shutil.rmtree(target_dir)
开发者ID:xpb,项目名称:oq-engine,代码行数:32,代码来源:hazard_test.py


示例17: hazard_id

    def hazard_id(self):
        job = helpers.get_hazard_job(
            helpers.demo_file("scenario_hazard/job.ini"))

        job.hazard_calculation = models.HazardCalculation.objects.create(
            owner=job.hazard_calculation.owner,
            truncation_level=job.hazard_calculation.truncation_level,
            maximum_distance=job.hazard_calculation.maximum_distance,
            intensity_measure_types_and_levels=(
                job.hazard_calculation.intensity_measure_types_and_levels),
            calculation_mode="scenario")

        job.save()

        output = models.Output.objects.create_output(
            job, "Test Hazard output", "gmf_scenario")

        fname = os.path.join(os.path.dirname(__file__), 'gmf_scenario.csv')
        with open(fname, 'rb') as csvfile:
            gmfreader = csv.reader(csvfile, delimiter=',')
            locations = gmfreader.next()

            arr = numpy.array([map(float, row) for row in gmfreader])
            for i, gmvs in enumerate(arr.transpose()):
                models.GmfScenario.objects.create(
                    output=output,
                    imt="MMI",
                    gmvs=gmvs,
                    result_grp_ordinal=1,
                    location="POINT(%s)" % locations[i])

        return output.id
开发者ID:gvallarelli,项目名称:oq-engine,代码行数:32,代码来源:test.py


示例18: test_peer_test_set_1_case_10

    def test_peer_test_set_1_case_10(self):
        expected_results = self._load_results("PeerTestSet1Case10")

        self._run_job(helpers.demo_file(
            os.path.join("PeerTestSet1Case10", "config.gem")))

        self._assert_results_are(expected_results)
开发者ID:dsg101,项目名称:openquake,代码行数:7,代码来源:classical_psha_unittest.py


示例19: hazard_id

    def hazard_id(self):
        job = helpers.get_hazard_job(
            helpers.demo_file("scenario_hazard/job.ini"))
        hc = job.hazard_calculation
        job.hazard_calculation = models.HazardCalculation.objects.create(
            owner=hc.owner, truncation_level=hc.truncation_level,
            maximum_distance=hc.maximum_distance,
            intensity_measure_types=["PGA"],
            calculation_mode="scenario")
        job.status = "complete"
        job.save()

        output = models.Output.objects.create_output(
            job, "Test Hazard output", "gmf_scenario")

        fname = os.path.join(os.path.dirname(__file__), 'gmf_scenario.csv')
        with open(fname, 'rb') as csvfile:
            gmfreader = csv.reader(csvfile, delimiter=',')
            locations = gmfreader.next()

            arr = numpy.array([[float(x) for x in row] for row in gmfreader])
            for i, gmvs in enumerate(arr):
                models.GmfScenario.objects.create(
                    output=output,
                    imt="PGA",
                    gmvs=gmvs,
                    location="POINT(%s)" % locations[i])
        return output.id
开发者ID:xpb,项目名称:oq-engine,代码行数:28,代码来源:test.py


示例20: hazard_id

    def hazard_id(self):
        job = helpers.get_hazard_job(
            helpers.demo_file("simple_fault_demo_hazard/job.ini"))

        hazard_curve = [
            (0.001, 0.0398612669790014),
            (0.01, 0.039861266979001400), (0.05, 0.039728757480298900),
            (0.10, 0.029613426625612500), (0.15, 0.019827328756491600),
            (0.20, 0.013062270161451900), (0.25, 0.008655387950000430),
            (0.30, 0.005898520593689670), (0.35, 0.004061698589511780),
            (0.40, 0.002811727179526820), (0.45, 0.001995117417776690),
            (0.50, 0.001358705972845710), (0.55, 0.000989667841573727),
            (0.60, 0.000757544444296432), (0.70, 0.000272824002045979),
            (0.80, 0.00), (0.9, 0.00), (1.0, 0.00)]

        hd = models.HazardCurveData.objects.create(
            hazard_curve=models.HazardCurve.objects.create(
                output=models.Output.objects.create_output(
                    job, "Test Hazard curve", "hazard_curve"),
                investigation_time=50,
                imt="PGA", imls=[hz[0] for hz in hazard_curve],
                statistics="mean"),
            poes=[hz[1] for hz in hazard_curve],
            location="POINT(1 1)")

        return hd.hazard_curve.output.id
开发者ID:4x,项目名称:oq-engine,代码行数:26,代码来源:test.py



注:本文中的tests.utils.helpers.demo_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python helpers.get_output_path函数代码示例发布时间:2022-05-27
下一篇:
Python helpers.deep_eq函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap