• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python helpers.get_data_path函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.utils.helpers.get_data_path函数的典型用法代码示例。如果您正苦于以下问题:Python get_data_path函数的具体用法?Python get_data_path怎么用?Python get_data_path使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_data_path函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_pre_execute_check_imts_raises

    def test_pre_execute_check_imts_raises(self):
        haz_job = engine.prepare_job()
        cfg = helpers.get_data_path('classical_job.ini')
        params, files = engine.parse_config(open(cfg, 'r'))
        haz_job.hazard_calculation = engine.create_hazard_calculation(
            haz_job.owner, params, files.values())
        haz_job.save()

        hazard_curve_output = models.Output.objects.create_output(
            haz_job, 'test_hazard_curve', 'hazard_curve'
        )
        models.HazardCurve.objects.create(
            output=hazard_curve_output,
            investigation_time=50.0,
            imt='PGV',  # the vulnerability model only defines SA(0.1)
            statistics='mean'
        )

        cfg = helpers.get_data_path(
            'end-to-end-hazard-risk/job_risk_classical.ini')
        risk_job = helpers.get_risk_job(
            cfg, hazard_output_id=hazard_curve_output.id
        )
        models.JobStats.objects.create(oq_job=risk_job)
        calc = classical.ClassicalRiskCalculator(risk_job)

        # Check for compatibility between the IMTs defined in the vulnerability
        # model and the chosen hazard output (--hazard-output-id)
        with self.assertRaises(ValueError) as ar:
            calc.pre_execute()
        self.assertEqual(
            "There is no hazard output for: SA(0.1). "
            "The available IMTs are: PGA.",
            ar.exception.message)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:34,代码来源:core_test.py


示例2: setUp

    def setUp(self):
        self.job, _ = helpers.get_fake_risk_job(
            get_data_path('event_based_bcr/job.ini'),
            get_data_path('event_based_hazard/job.ini'), output_type="gmf")

        self.calculator = core.EventBasedBCRRiskCalculator(self.job)
        models.JobStats.objects.create(oq_job=self.job)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:7,代码来源:core_test.py


示例3: test_pre_execute_check_imts_no_errors

    def test_pre_execute_check_imts_no_errors(self):
        haz_job = engine.prepare_job()

        cfg = helpers.get_data_path(
            'end-to-end-hazard-risk/job_haz_classical.ini')
        params, files = engine.parse_config(open(cfg, 'r'))
        haz_job.hazard_calculation = engine.create_hazard_calculation(
            haz_job.owner, params, files.values())
        haz_job.save()

        hazard_curve_output = models.Output.objects.create_output(
            haz_job, 'test_hazard_curve', 'hazard_curve'
        )
        models.HazardCurve.objects.create(
            output=hazard_curve_output,
            investigation_time=50.0,
            # this imt is compatible with the vuln model
            imt='SA',
            sa_period=0.025,
            sa_damping=5.0,
            statistics='mean'
        )

        cfg = helpers.get_data_path(
            'end-to-end-hazard-risk/job_risk_classical.ini')
        risk_job = helpers.get_risk_job(
            cfg, hazard_output_id=hazard_curve_output.id
        )
        models.JobStats.objects.create(oq_job=risk_job)
        calc = classical.ClassicalRiskCalculator(risk_job)

        # In contrast to the test above (`test_pre_execute_check_imts_raises`),
        # we expect no errors to be raised.
        calc.pre_execute()
开发者ID:kenxshao,项目名称:oq-engine,代码行数:34,代码来源:core_test.py


示例4: setUp

    def setUp(self):
        self.generated_files = []
        self.job = Job.from_file(helpers.get_data_path(CONFIG_FILE))
        self.job_with_includes = Job.from_file(
                                    helpers.get_data_path(CONFIG_WITH_INCLUDES))

        self.generated_files.append(self.job.super_config_path)
        self.generated_files.append(self.job_with_includes.super_config_path)
开发者ID:danciul,项目名称:openquake,代码行数:8,代码来源:job_unittest.py


示例5: test_with_input_type

    def test_with_input_type(self):
        job, files = helpers.get_fake_risk_job(
            get_data_path('classical_psha_based_risk/job.ini'),
            get_data_path('simple_fault_demo_hazard/job.ini'))
        rc = job.risk_calculation

        inputs = models.inputs4rcalc(rc.id, input_type='exposure')
        self.assertEqual(1, inputs.count())
开发者ID:larsbutler,项目名称:oq-engine,代码行数:8,代码来源:models_test.py


示例6: setUp

    def setUp(self):
        client = kvs.get_client()

        # Delete managed job id info so we can predict the job key
        # which will be allocated for us
        client.delete(kvs.tokens.CURRENT_JOBS)

        self.generated_files = []
        self.job = helpers.job_from_file(helpers.get_data_path(CONFIG_FILE))
        self.job_with_includes = helpers.job_from_file(helpers.get_data_path(CONFIG_WITH_INCLUDES))
开发者ID:kpanic,项目名称:openquake,代码行数:10,代码来源:job_unittest.py


示例7: setUp

    def setUp(self):
        self.job, _ = helpers.get_fake_risk_job(
            get_data_path("event_based_risk/job.ini"), get_data_path("event_based_hazard/job.ini"), output_type="gmf"
        )

        self.calculator = event_based.EventBasedRiskCalculator(self.job)
        models.JobStats.objects.create(oq_job=self.job)
        self.calculator.pre_execute()
        self.job.is_running = True
        self.job.status = "executing"
        self.job.save()
开发者ID:kenxshao,项目名称:oq-engine,代码行数:11,代码来源:core_test.py


示例8: test_a_few_inputs

    def test_a_few_inputs(self):
        job, files = helpers.get_fake_risk_job(
            get_data_path('classical_psha_based_risk/job.ini'),
            get_data_path('simple_fault_demo_hazard/job.ini'))
        rc = job.risk_calculation

        expected_ids = sorted([x.id for x in files.values()])

        inputs = models.inputs4rcalc(rc.id)

        actual_ids = sorted([x.id for x in inputs])

        self.assertEqual(expected_ids, actual_ids)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:13,代码来源:models_test.py


示例9: setUp

 def setUp(self):
     job, _ = helpers.get_fake_risk_job(
         get_data_path('classical_psha_based_risk/job.ini'),
         get_data_path('simple_fault_demo_hazard/job.ini')
     )
     self.compulsory_arguments = dict(
         lrem_steps_per_interval=5)
     self.other_args = dict(
         calculation_mode="classical",
         region_constraint=(
             'POLYGON((-122.0 38.113, -122.114 38.113, -122.57 38.111, '
             '-122.0 38.113))'),
         hazard_output=job.risk_calculation.hazard_output)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:13,代码来源:validation_test.py


示例10: test_with_input_type

    def test_with_input_type(self):
        job, files = helpers.get_fake_risk_job(
            get_data_path('classical_psha_based_risk/job.ini'),
            get_data_path('simple_fault_demo_hazard/job.ini'))
        rc = job.risk_calculation

        # It should only be 1 id, actually.
        expected_ids = [x.id for x in files.values()
                        if x.input_type == 'exposure']

        inputs = models.inputs4rcalc(rc.id, input_type='exposure')

        actual_ids = sorted([x.id for x in inputs])

        self.assertEqual(expected_ids, actual_ids)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:15,代码来源:models_test.py


示例11: setUp

 def setUp(self):
     self.cfg = helpers.get_data_path('event_based_hazard/job_2.ini')
     self.job = helpers.get_hazard_job(self.cfg, username=getpass.getuser())
     self.calc = core.EventBasedHazardCalculator(self.job)
     hc_id = self.job.hazard_calculation.id
     models.SiteCollection.cache[hc_id] = make_site_coll(0, 0, n=5)
     models.JobStats.objects.create(oq_job=self.job)
开发者ID:chenliu0831,项目名称:oq-engine,代码行数:7,代码来源:core_test.py


示例12: test_initialize_site_model

    def test_initialize_site_model(self):
        # we need a slightly different config file for this test
        cfg = helpers.get_data_path(
            'simple_fault_demo_hazard/job_with_site_model.ini')
        self.job = helpers.get_hazard_job(cfg)
        self.calc = core.ClassicalHazardCalculator(self.job)

        self.calc.initialize_site_model()
        # If the site model isn't valid for the calculation geometry, a
        # `RuntimeError` should be raised here

        # Okay, it's all good. Now check the count of the site model records.
        sm_nodes = models.SiteModel.objects.filter(job=self.job)

        self.assertEqual(2601, len(sm_nodes))

        num_pts_to_compute = len(
            self.job.hazard_calculation.points_to_compute())

        hazard_site = models.HazardSite.objects.filter(
            hazard_calculation=self.job.hazard_calculation)

        # The site model is good. Now test that `hazard_site` was computed.
        # For now, just test the length.
        self.assertEqual(num_pts_to_compute, len(hazard_site))
开发者ID:ryanberrio,项目名称:oq-engine,代码行数:25,代码来源:core_test.py


示例13: test_export_for_scenario

    def test_export_for_scenario(self):
        target_dir = tempfile.mkdtemp()

        try:
            cfg = helpers.get_data_path('scenario_hazard/job.ini')

            # run the calculation in process to create something to export
            os.environ['OQ_NO_DISTRIBUTE'] = '1'
            try:
                helpers.run_hazard_job(cfg)
            finally:
                del os.environ['OQ_NO_DISTRIBUTE']
            job = models.OqJob.objects.latest('id')
            self.assertEqual(job.status, 'complete')

            outputs = export_core.get_outputs(job.id)

            self.assertEqual(1, len(outputs))  # 1 GMF

            gmf_outputs = outputs.filter(output_type='gmf_scenario')
            self.assertEqual(1, len(gmf_outputs))

            exported_files = check_export(gmf_outputs[0].id, target_dir)

            self.assertEqual(1, len(exported_files))
            # Check the file paths exist, is absolute, and the file isn't
            # empty.
            f = exported_files[0]
            self._test_exported_file(f)

            # Check for the correct number of GMFs in the file:
            tree = etree.parse(f)
            self.assertEqual(20, number_of('nrml:gmf', tree))
        finally:
            shutil.rmtree(target_dir)
开发者ID:kenxshao,项目名称:oq-engine,代码行数:35,代码来源:hazard_test.py


示例14: setUp

 def setUp(self):
     self.gmf_string = open(helpers.get_data_path("gmfs.json")).readline()
     region = shapes.Region.from_coordinates(
              [(-118.30, 34.12), (-118.18, 34.12),
              (-118.18,  34.00), (-118.30,  34.00)])
     region.cell_size = 0.02
     self.grid = region.grid
开发者ID:kpanic,项目名称:openquake,代码行数:7,代码来源:shapes_unittest.py


示例15: test_successful_job_lifecycle

    def test_successful_job_lifecycle(self):
        with patch('openquake.job.Job.from_file') as from_file:

            # called in place of Job.launch
            def test_status_running_and_succeed():
                self.assertEquals('running', self._job_status())

                return []

            # replaces Job.launch with a mock
            def patch_job_launch(*args, **kwargs):
                self.job = self.job_from_file(*args, **kwargs)
                self.job.launch = mock.Mock(
                    side_effect=test_status_running_and_succeed)

                self.assertEquals('pending', self._job_status())

                return self.job

            from_file.side_effect = patch_job_launch

            with patch('openquake.job.spawn_job_supervisor'):
                run_job(helpers.get_data_path(CONFIG_FILE), 'db')

        self.assertEquals(1, self.job.launch.call_count)
        self.assertEquals('succeeded', self._job_status())
开发者ID:favalex,项目名称:openquake,代码行数:26,代码来源:job_unittest.py


示例16: test_http_handler_writes_a_file

    def test_http_handler_writes_a_file(self):
        class StubbedHTTPConnection(StubbedGetter):
            def __enter__(self):
                return self

            def __exit__(self, *args):
                pass

            def request(self, req_type, path):
                self.remote_path = path
                return self

            def getresponse(self):
                return self

            def read(self):
                with open(self.remote_path, "r") as reader:
                    return reader.read()

        expected_path = "/tmp/fake_file"
        remote_path = "http://localhost/%s" % helpers.get_data_path("config.gem")
        url = urlparse.urlparse(remote_path)

        http_handler = handlers.HTTPHandler(url, expected_path)
        guaranteed_file = http_handler.handle(getter=StubbedHTTPConnection)
        self.assertTrue(os.path.isfile(guaranteed_file))
        os.unlink(guaranteed_file)
开发者ID:danciul,项目名称:openquake,代码行数:27,代码来源:handlers_unittest.py


示例17: setUp

    def setUp(self):
        client = kvs.get_client()

        # Delete managed job id info so we can predict the job key
        # which will be allocated for us
        client.delete(kvs.tokens.CURRENT_JOBS)

        self.generated_files = []

        job = engine.prepare_job()
        jp, params, sections = import_job_profile(helpers.get_data_path(CONFIG_FILE), job)
        self.job_ctxt = JobContext(params, job.id, sections=sections, oq_job_profile=jp, oq_job=job)

        job = engine.prepare_job()
        jp, params, sections = import_job_profile(helpers.get_data_path(CONFIG_WITH_INCLUDES), job)
        self.job_ctxt_with_includes = JobContext(params, job.id, sections=sections, oq_job_profile=jp, oq_job=job)
开发者ID:wmorales,项目名称:oq-engine,代码行数:16,代码来源:job_unittest.py


示例18: setUp

    def setUp(self):
        # Patch a few methods here and restore them in the tearDown to avoid
        # too many nested with
        # See http://www.voidspace.org.uk/python/mock/patch.html \
        #     #patch-methods-start-and-stop
        self.patchers = []

        def start_patch(attr_path):
            _, attr = attr_path.rsplit('.', 1)
            patcher = patch(attr_path)
            self.patchers.append(patcher)
            setattr(self, attr, patcher.start())

        start_patch('openquake.engine.supervising.is_pid_running')

        # Patch the actions taken by the supervisor
        start_patch('openquake.engine.supervising.supervisor.\
record_job_stop_time')
        start_patch(
            'openquake.engine.supervising.supervisor.cleanup_after_job')
        start_patch('openquake.engine.supervising.supervisor.terminate_job')
        start_patch('openquake.engine.supervising.supervisor.get_job_status')
        start_patch('openquake.engine.supervising.supervisor'
                    '.update_job_status')

        logging.root.setLevel(logging.CRITICAL)

        cfg = get_data_path('end-to-end-hazard-risk/job_haz_classical.ini')
        self.job = get_hazard_job(cfg)
开发者ID:Chunghan,项目名称:oq-engine,代码行数:29,代码来源:supervisor_test.py


示例19: test_serialize

    def test_serialize(self):
        expected_file = helpers.get_data_path(
            "expected-dmg-dist-total.xml")

        expected_text = open(expected_file, "r").readlines()

        self.make_dist()

        self.make_data("no_damage", 1.0, 1.6)
        self.make_data("slight", 34.8, 18.3)
        self.make_data("moderate", 64.2, 19.8)
        self.make_data("extensive", 64.3, 19.7)
        self.make_data("complete", 64.3, 19.7)

        try:
            _, result_xml = tempfile.mkstemp()

            writer = DmgDistTotalXMLWriter(result_xml,
                    "ebl1", self.damage_states)

            writer.serialize(self.data)
            actual_text = open(result_xml, "r").readlines()

            self.assertEqual(expected_text, actual_text)

            self.assertTrue(xml.validates_against_xml_schema(
                    result_xml))
        finally:
            os.unlink(result_xml)
开发者ID:angri,项目名称:openquake,代码行数:29,代码来源:scenario_damage_test.py


示例20: test_validate_warns

    def test_validate_warns(self):
        # Test that `validate` raises warnings if unnecessary parameters are
        # specified for a given calculation.
        # For example, `ses_per_logic_tree_path` is an event-based hazard
        # param; if this param is specified for a classical hazard job, a
        # warning should be raised.
        cfg_file = helpers.get_data_path('simple_fault_demo_hazard/job.ini')
        job = engine.prepare_job()
        params = engine.parse_config(open(cfg_file, 'r'))
        # Add a few superfluous parameters:
        params['ses_per_logic_tree_path'] = 5
        params['ground_motion_correlation_model'] = 'JB2009'
        calculation = engine.create_calculation(
            models.HazardCalculation, params)
        job.hazard_calculation = calculation
        job.save()

        with warnings.catch_warnings(record=True) as w:
            validation.validate(job, 'hazard', params, ['xml'])

        expected_warnings = [
            "Unknown parameter '%s' for calculation mode 'classical'."
            " Ignoring." % x for x in ('ses_per_logic_tree_path',
                                       'ground_motion_correlation_model')
        ]

        actual_warnings = [m.message.message for m in w]
        self.assertEqual(sorted(expected_warnings), sorted(actual_warnings))
开发者ID:Chunghan,项目名称:oq-engine,代码行数:28,代码来源:validation_test.py



注:本文中的tests.utils.helpers.get_data_path函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python helpers.get_hazard_job函数代码示例发布时间:2022-05-27
下一篇:
Python helpers.default_user函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap