• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python dosta_ln_wfp.DostaLnWfpParser类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mi.dataset.parser.dosta_ln_wfp.DostaLnWfpParser的典型用法代码示例。如果您正苦于以下问题:Python DostaLnWfpParser类的具体用法?Python DostaLnWfpParser怎么用?Python DostaLnWfpParser使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DostaLnWfpParser类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_bad_header

    def test_bad_header(self):
        """
        Ensure that bad data is skipped when it exists.
        """

        # This case tests against a header that does not match
        # 0000 0000 0000 0100 0000 0000 0000 0151
        file_path = os.path.join(RESOURCE_PATH, 'E0000001-BAD-HEADER1.DAT')
        self.stream_handle = open(file_path, 'rb')

        with self.assertRaises(SampleException):
            self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                           self.state_callback, self.pub_callback, self.exception_callback)

        self.stream_handle.close()

        # This case tests against a header that does not match global, but matches coastal
        # 0001 0000 0000 0000 0001 0001 0000 0000
        file_path = os.path.join(RESOURCE_PATH, 'E0000001-BAD-HEADER2.DAT')
        self.stream_handle = open(file_path, 'rb')

        with self.assertRaises(SampleException):
            self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                           self.state_callback, self.pub_callback, self.exception_callback)

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:26,代码来源:test_dosta_ln_wfp.py


示例2: test_get_many

    def test_get_many(self):
        """
        Read test data and pull out multiple data particles at one time.
        Assert that the results are those we expected.
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

        particles = self.parser.get_records(20)

        # Should end up with 20 particles
        self.assertTrue(len(particles) == 20)

        self.assert_result(self.test_particle3, particles[19])

        particles = self.parser.get_records(30)

        # Should end up with 30 particles
        self.assertTrue(len(particles) == 30)

        self.assert_result(self.test_particle4, particles[29])

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:26,代码来源:test_dosta_ln_wfp.py


示例3: test_simple

    def test_simple(self):
        """
        Read test data and pull out data particles one at a time.
        Assert that the results are those we expected.
        """

        file_path = os.path.join(RESOURCE_PATH, 'small.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

        particles = self.parser.get_records(6)

        for particle in particles:
            log.info(particle.generate_dict())

        # Make sure the fifth particle has the correct values
        self.assert_result(self.test_particle1, particles[5])

        test_data = self.get_dict_from_yml('good.yml')

        for i in range(0,6):
            self.assert_result(test_data['data'][i], particles[i])

        self.stream_handle.close()
开发者ID:chrisfortin,项目名称:mi-dataset,代码行数:26,代码来源:test_dosta_ln_wfp.py


示例4: test_set_state

    def test_set_state(self):
        """
        Test changing to a new state after initializing the parser and 
        reading data, as if new data has been found and the state has
        changed
        """
        filepath = os.path.join(RESOURCE_PATH, "E0000001.DAT")
        self.stream_handle = open(filepath, "rb")

        # Moving the file position past the header and two records
        new_state = {StateKey.POSITION: HEADER_BYTES + (WFP_E_GLOBAL_RECOVERED_ENG_DATA_SAMPLE_BYTES * 2)}

        self.parser = DostaLnWfpParser(
            self.config, new_state, self.stream_handle, self.state_callback, self.pub_callback, self.exception_callback
        )

        particles = self.parser.get_records(4)

        # Should end up with 4 particles
        self.assertTrue(len(particles) == 4)

        self.assert_result(self.test_particle1, particles[3])

        # Moving the file position past the header and three records
        new_state = {StateKey.POSITION: HEADER_BYTES + (WFP_E_GLOBAL_RECOVERED_ENG_DATA_SAMPLE_BYTES * 3)}

        self.parser = DostaLnWfpParser(
            self.config, new_state, self.stream_handle, self.state_callback, self.pub_callback, self.exception_callback
        )

        particles = self.parser.get_records(10)

        # Should end up with 10 particles
        self.assertTrue(len(particles) == 10)

        self.assert_result(self.test_particle2, particles[9])

        self.stream_handle.close()
开发者ID:emilyhahn,项目名称:mi-dataset,代码行数:38,代码来源:test_dosta_ln_wfp.py


示例5: test_bad_data

    def test_bad_data(self):
        """
        Ensure that bad data is skipped when it exists.
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001-BAD-DATA.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                       self.state_callback, self.pub_callback, self.exception_callback)


        with self.assertRaises(SampleException):
             self.parser.get_records(1)

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:15,代码来源:test_dosta_ln_wfp.py


示例6: create_large_yml

    def create_large_yml(self):
        """
        Create a large yml file corresponding to an actual recovered dataset. This is not an actual test - it allows
        us to create what we need for integration testing, i.e. a yml file.
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

        # In a single read, get all particles in this file.
        result = self.parser.get_records(1000)

        self.particle_to_yml(result, 'E0000001.yml')
开发者ID:danmergens,项目名称:mi-instrument,代码行数:15,代码来源:test_dosta_ln_wfp.py


示例7: test_long_stream

    def test_long_stream(self):
        """
        Test a long stream
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000002.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                       self.state_callback, self.pub_callback, self.exception_callback)

        particles = self.parser.get_records(1000)

        # Should end up with 683 particles
        self.assertTrue(len(particles) == 683)

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:16,代码来源:test_dosta_ln_wfp.py


示例8: test_verify_record_against_yaml

    def test_verify_record_against_yaml(self):
        """
        Read data from a file and pull out data particles
        one at a time. Verify that the results are those we expected.
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

        # In a single read, get all particles in this file.
        result = self.parser.get_records(1000)

        self.assert_particles(result, 'E0000001.yml', RESOURCE_PATH)

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:17,代码来源:test_dosta_ln_wfp.py


示例9: test_mid_state_start

    def test_mid_state_start(self):
        """
        Test starting the parser in a state in the middle of processing
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001.DAT')
        self.stream_handle = open(file_path, 'rb')

        # Moving the file position past the header and two records
        new_state = {StateKey.POSITION: HEADER_BYTES+(WFP_E_GLOBAL_RECOVERED_ENG_DATA_SAMPLE_BYTES*2)}

        self.parser = DostaLnWfpParser(self.config, new_state, self.stream_handle,
                                       self.state_callback, self.pub_callback, self.exception_callback)

        particles = self.parser.get_records(4)

        # Should end up with 4 particles
        self.assertTrue(len(particles) == 4)

        self.assert_result(self.test_particle1, particles[3])

        self.stream_handle.close()
开发者ID:danmergens,项目名称:mi-instrument,代码行数:21,代码来源:test_dosta_ln_wfp.py


示例10: DostaLnWfpParserUnitTestCase

class DostaLnWfpParserUnitTestCase(ParserUnitTestCase):
    """
    dosta_ln_wfp Parser unit test suite
    """
    def state_callback(self, state, file_ingested):
        """ Call back method to watch what comes in via the position callback """
        self.state_callback_value = state
        self.file_ingested_value = file_ingested

    def pub_callback(self, pub):
        """ Call back method to watch what comes in via the publish callback """
        self.publish_callback_value = pub

    def exception_callback(self, exception):
        """ Call back method to match what comes in via the exception callback """
        self.exception_callback_value = exception

    def setUp(self):
        ParserUnitTestCase.setUp(self)
        self.config = {
            DataSetDriverConfigKeys.PARTICLE_MODULE: 'mi.dataset.parser.dosta_ln_wfp',
            DataSetDriverConfigKeys.PARTICLE_CLASS: 'DostaLnWfpInstrumentParserDataParticle'
        }
        # Define test data particles and their associated timestamps which will be
        # compared with returned results

        self.start_state = {StateKey.POSITION: 0}

        self.file_ingested_value = None
        self.state_callback_value = None
        self.publish_callback_value = None

        self.test_particle1 = {}
        self.test_particle1['internal_timestamp'] = 3583638177
        self.test_particle1[StateKey.POSITION] = 204
        self.test_particle1[DostaLnWfpInstrumentParserDataParticleKey.ESTIMATED_OXYGEN_CONCENTRATION] = \
            154.23699951171875
        self.test_particle1[DostaLnWfpInstrumentParserDataParticleKey.OPTODE_TEMPERATURE] = 1.4819999933242798
        self.test_particle1[DostaLnWfpInstrumentParserDataParticleKey.WFP_TIMESTAMP] = 1374649377

        self.test_particle2 = {}
        self.test_particle2['internal_timestamp'] = 3583638247
        self.test_particle2[StateKey.POSITION] = 414
        self.test_particle2[DostaLnWfpInstrumentParserDataParticleKey.ESTIMATED_OXYGEN_CONCENTRATION] = \
            153.7899932861328
        self.test_particle2[DostaLnWfpInstrumentParserDataParticleKey.OPTODE_TEMPERATURE] = 1.4950000047683716
        self.test_particle2[DostaLnWfpInstrumentParserDataParticleKey.WFP_TIMESTAMP] = 1374649447

        self.test_particle3 = {}
        self.test_particle3['internal_timestamp'] = 3583638317
        self.test_particle3[StateKey.POSITION] = 624
        self.test_particle3[DostaLnWfpInstrumentParserDataParticleKey.ESTIMATED_OXYGEN_CONCENTRATION] = \
            153.41099548339844
        self.test_particle3[DostaLnWfpInstrumentParserDataParticleKey.OPTODE_TEMPERATURE] = 1.5
        self.test_particle3[DostaLnWfpInstrumentParserDataParticleKey.WFP_TIMESTAMP] = 1374649517

        self.test_particle4 = {}
        self.test_particle4['internal_timestamp'] = 3583638617
        self.test_particle4[StateKey.POSITION] = 1524
        self.test_particle4[DostaLnWfpInstrumentParserDataParticleKey.ESTIMATED_OXYGEN_CONCENTRATION] = \
            152.13600158691406
        self.test_particle4[DostaLnWfpInstrumentParserDataParticleKey.OPTODE_TEMPERATURE] = 1.5019999742507935
        self.test_particle4[DostaLnWfpInstrumentParserDataParticleKey.WFP_TIMESTAMP] = 1374649817


    def test_simple(self):
        """
        Read test data and pull out data particles one at a time.
        Assert that the results are those we expected.
        """

        file_path = os.path.join(RESOURCE_PATH, 'small.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

        particles = self.parser.get_records(6)

        # Make sure the fifth particle has the correct values
        self.assert_result(self.test_particle1, particles[5])

        test_data = self.get_dict_from_yml('good.yml')

        for i in range(0,6):
            self.assert_result(test_data['data'][i], particles[i])

        self.stream_handle.close()

    def test_get_many(self):
        """
        Read test data and pull out multiple data particles at one time.
        Assert that the results are those we expected.
        """
        file_path = os.path.join(RESOURCE_PATH, 'E0000001.DAT')
        self.stream_handle = open(file_path, 'rb')

        self.parser = DostaLnWfpParser(self.config, self.start_state, self.stream_handle,
                                     self.state_callback, self.pub_callback, self.exception_callback)

#.........这里部分代码省略.........
开发者ID:danmergens,项目名称:mi-instrument,代码行数:101,代码来源:test_dosta_ln_wfp.py



注:本文中的mi.dataset.parser.dosta_ln_wfp.DostaLnWfpParser类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python mmp_cds_base.MmpCdsParser类代码示例发布时间:2022-05-27
下一篇:
Python dosta_abcdjm_cspp.DostaAbcdjmCsppParser类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap