• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python dosta_abcdjm_cspp.DostaAbcdjmCsppParser类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mi.dataset.parser.dosta_abcdjm_cspp.DostaAbcdjmCsppParser的典型用法代码示例。如果您正苦于以下问题:Python DostaAbcdjmCsppParser类的具体用法?Python DostaAbcdjmCsppParser怎么用?Python DostaAbcdjmCsppParser使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DostaAbcdjmCsppParser类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_many

    def test_get_many(self):
        """
        Read test data and pull out multiple data particles at one time.
        Assert that the results are those we expected.
        """

        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        # Let's attempt to retrieve 20 particles
        particles = parser.get_records(20)

        log.info("Exception callback value: %s", self.exception_callback_value)

        # Should end up with 20 particles
        self.assertTrue(len(particles) == 20)

        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i], particles[i])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:25,代码来源:test_dosta_abcdjm_cspp.py


示例2: test_simple

    def test_simple(self):
        """
        Read test data and pull out data particles.
        Assert that the results are those we expected.
        """
        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(5)

        log.info("Exception callback value: %s", self.exception_callback_value)

        self.assertTrue(self.exception_callback_value is None)

        self.assertTrue(len(particles) == 5)

        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')

        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i], particles[i])

        stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:27,代码来源:test_dosta_abcdjm_cspp.py


示例3: test_midstate_start

    def test_midstate_start(self):
        """
        This test makes sure that we retrieve the correct particles upon starting with an offsetted state.
        """
        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')

        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        initial_state = {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True}

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       initial_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(2)

        log.info("Num particles: %s", len(particles))

        self.assertTrue(len(particles) == 2)

        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i+2], particles[i])

        log.info("******** Read State: %s", parser._read_state)
        log.info("******** State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 518, StateKey.METADATA_EXTRACTED: True})
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:29,代码来源:test_dosta_abcdjm_cspp.py


示例4: test_bad_header_start_date

    def test_bad_header_start_date(self):
        """
        Ensure that bad data is skipped when it exists.
        """

        with open(os.path.join(RESOURCE_PATH, 'BadHeaderProcessedData_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            # parser should return metadata without start date filled in
            parser.get_records(1)
            self.assertEqual(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:12,代码来源:test_dosta_abcdjm_cspp.py


示例5: test_bad_header_source_file_name

    def test_bad_header_source_file_name(self):
        """
        Ensure that bad source file name produces an error
        """

        with open(os.path.join(RESOURCE_PATH, 'BadHeaderSourceFileName_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            parser.get_records(1)

            self.assertEquals(len(self.exception_callback_value), 1)
            self.assertIsInstance(self.exception_callback_value[0], SampleEncodingException)
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:13,代码来源:test_dosta_abcdjm_cspp.py


示例6: test_bad_data_record

    def test_bad_data_record(self):
        """
        Ensure that bad data creates a recoverable sample exception and parsing continues
        """

        with open(os.path.join(RESOURCE_PATH, 'BadDataRecord_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            particles = parser.get_records(19)

            self.assert_particles(particles, 'BadDataRecord_PPB_OPT.yml', RESOURCE_PATH)

            self.assertEquals(len(self.exception_callback_value), 1)
            self.assertIsInstance(self.exception_callback_value[0], RecoverableSampleException)
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py


示例7: test_linux_source_path_handling

    def test_linux_source_path_handling(self):
        """
        Read linux source path test data and assert that the results are those we expected.
        """
        with open(os.path.join(RESOURCE_PATH, 'linux_11079894_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            particles = parser.get_records(20)

            self.assertTrue(len(particles) == 20)

            self.assert_particles(particles, 'linux.yml', RESOURCE_PATH)

            self.assertEqual(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py


示例8: test_long_stream

    def test_long_stream(self):
        """
        Test a long stream 
        """
        with open(os.path.join(RESOURCE_PATH, '11079364_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            # Let's attempt to retrieve more particles than are available
            particles = parser.get_records(300)

            # Should end up with 272 particles
            self.assertTrue(len(particles) == 272)

            self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py


示例9: test_bad_header_start_date

    def test_bad_header_start_date(self):
        """
        Ensure that bad data is skipped when it exists.
        """

        file_path = os.path.join(RESOURCE_PATH, 'BadHeaderProcessedData_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        log.info(self.exception_callback_value)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        with self.assertRaises(SampleException):
            parser.get_records(1)

        stream_handle.close()
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py


示例10: test_simple_telem

    def test_simple_telem(self):
        """
        Read test data and pull out data particles.
        Assert that the results are those we expected.
        """

        with open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_telemetered, stream_handle, self.exception_callback)

            # Attempt to retrieve 20 particles, there are only 18 in the file though
            particles = parser.get_records(20)

            # Should end up with 18 particles
            self.assertTrue(len(particles) == 18)

            self.assert_particles(particles, '11194982_PPD_OPT.yml', RESOURCE_PATH)

            self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py


示例11: test_simple_recov

    def test_simple_recov(self):
        """
        Read test data and pull out data particles.
        Assert that the results are those we expected.
        """

        with open(os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            # Attempt to retrieve 20 particles, there are more in this file but only verify 20
            particles = parser.get_records(20)

            # Should end up with 20 particles
            self.assertTrue(len(particles) == 20)

            self.assert_particles(particles, '11079894_PPB_OPT.yml', RESOURCE_PATH)

            self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py


示例12: test_bad_header_source_file_name

    def test_bad_header_source_file_name(self):
        """
        Ensure that bad data is skipped when it exists.
        """

        file_path = os.path.join(RESOURCE_PATH, 'BadHeaderSourceFileName_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        log.info(self.exception_callback_value)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        parser.get_records(1)

        log.info("Exception callback value: %s", self.exception_callback_value)

        self.assertTrue(self.exception_callback_value != None)

        stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:22,代码来源:test_dosta_abcdjm_cspp.py


示例13: test_air_saturation_preset

    def test_air_saturation_preset(self):
        """
        Ensure that input files containing the air saturation field are parsed correctly.
        Redmine #10238 Identified additional parameter enabled after first deployment
        """
        with open(os.path.join(RESOURCE_PATH, 'ucspp_32260420_PPB_OPT.txt'), 'rU') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            # get the metadata particle and first 2 instrument particles and verify values.
            particles = parser.get_records(3)

            self.assertTrue(len(particles) == 3)
            self.assertEqual(self.exception_callback_value, [])

            self.assert_particles(particles, 'ucspp_32260420_PPB_OPT.yml', RESOURCE_PATH)

            # get remaining particles and verify parsed without error.
            particles = parser.get_records(100)

            self.assertTrue(len(particles) == 93)
            self.assertEqual(self.exception_callback_value, [])
开发者ID:danmergens,项目名称:mi-instrument,代码行数:22,代码来源:test_dosta_abcdjm_cspp.py


示例14: test_get_many

    def test_get_many(self):
        """
        Read test data and pull out data particles in smaller groups.
        Assert that the results are those we expected.
        """

        with open(os.path.join(RESOURCE_PATH, '11079419_PPB_OPT.txt'), 'r') as stream_handle:

            parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)

            # Attempt to retrieve 20 total particles
            particles = parser.get_records(5)
            particles2 = parser.get_records(10)
            particles.extend(particles2)
            particles3 = parser.get_records(5)
            particles.extend(particles3)

            # Should end up with 20 particles
            self.assertTrue(len(particles) == 20)

            self.assert_particles(particles, '11079419_PPB_OPT.yml', RESOURCE_PATH)

            self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:23,代码来源:test_dosta_abcdjm_cspp.py


示例15: test_long_stream

    def test_long_stream(self):
        """
        Test a long stream 
        """

        file_path = os.path.join(RESOURCE_PATH, '11079364_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        # Let's attempt to retrieve 2000 particles
        particles = parser.get_records(300)

        log.info("Num particles: %s", len(particles))

        log.info("Exception callback value: %s", self.exception_callback_value)

        # Should end up with 272 particles
        self.assertTrue(len(particles) == 272)

        stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:24,代码来源:test_dosta_abcdjm_cspp.py


示例16: create_yml

    def create_yml(self):
        """
        This utility creates a yml file
        """

        fid = open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'))
        test_buffer = fid.read()
        fid.close()

        self.stream_handle = StringIO(test_buffer)
        self.parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED),
                                            None, self.stream_handle,
                                            self.state_callback, self.pub_callback,
                                            self.exception_callback)

        particles = self.parser.get_records(20)

        log.info("Exception callback value: %s", self.exception_callback_value)

        self.particle_to_yml(particles, '11194982_PPD_OPT.yml')
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:20,代码来源:test_dosta_abcdjm_cspp.py


示例17: test_state_after_one_record_retrieval

    def test_state_after_one_record_retrieval(self):
        """
        This test makes sure that we get the correct particles upon requesting one record at
        a time.
        """

        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')

        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')
    
        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)
    
        log.info("Num particles: %s", len(particles))
    
        self.assertTrue(len(particles) == 1)
    
        log.info("11111111 Read State: %s", parser._read_state)
        log.info("11111111 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][0], particles[0])
        
        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("22222222 Read State: %s", parser._read_state)
        log.info("22222222 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][1], particles[0])

        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("33333333 Read State: %s", parser._read_state)
        log.info("33333333 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 425, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][2], particles[0])

        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("44444444 Read State: %s", parser._read_state)
        log.info("44444444 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 518, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][3], particles[0])

        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("55555555 Read State: %s", parser._read_state)
        log.info("55555555 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 611, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][4], particles[0])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:92,代码来源:test_dosta_abcdjm_cspp.py


示例18: test_state_after_two_record_retrievals

    def test_state_after_two_record_retrievals(self):
        """
        This test makes sure that we get the correct particles upon requesting two records at
        a time.
        """

        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')

        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(2)

        log.info("Num particles: %s", len(particles))

        self.assertTrue(len(particles) == 2)

        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i], particles[i])

        log.info("11111111 Read State: %s", parser._read_state)
        log.info("11111111 State: %s", parser._state)

        the_new_state = {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True}
        log.info("11111111 new parser state: %s", the_new_state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       the_new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(2)

        log.info("Num particles: %s", len(particles))

        self.assertTrue(len(particles) == 2)

        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i+2], particles[i])

        log.info("22222222 Read State: %s", parser._read_state)
        log.info("22222222 State: %s", parser._state)

        the_new_state = {StateKey.POSITION: 480, StateKey.METADATA_EXTRACTED: True}
        log.info("22222222 new parser state: %s", the_new_state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       the_new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(2)

        log.info("Num particles: %s", len(particles))

        self.assertTrue(len(particles) == 2)

        for i in range(len(particles)):
            self.assert_result(expected_results['data'][i+4], particles[i])

        log.info("33333333 Read State: %s", parser._read_state)
        log.info("33333333 State: %s", parser._state)
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:67,代码来源:test_dosta_abcdjm_cspp.py


示例19: DostaAbcdjmCsppParserUnitTestCase

class DostaAbcdjmCsppParserUnitTestCase(ParserUnitTestCase):
    """
    dosta_abcdjm_cspp Parser unit test suite
    """

    def state_callback(self, state, file_ingested):
        """ Call back method to watch what comes in via the position callback """
        self.state_callback_value = state
        self.file_ingested_value = file_ingested

    def pub_callback(self, pub):
        """ Call back method to watch what comes in via the publish callback """
        self.publish_callback_value = pub

    def exception_callback(self, exception):
        """ Callback method to watch what comes in via the exception callback """
        self.exception_callback_value = exception

    def setUp(self):
        ParserUnitTestCase.setUp(self)
        self.config = {
            DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED: {
                DataSetDriverConfigKeys.PARTICLE_MODULE: 'mi.dataset.parser.dosta_abcdjm_cspp',
                DataSetDriverConfigKeys.PARTICLE_CLASS: None,
                DataSetDriverConfigKeys.PARTICLE_CLASSES_DICT: {
                    METADATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppMetadataRecoveredDataParticle,
                    DATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppInstrumentRecoveredDataParticle,
                }
            },
            DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED: {
                DataSetDriverConfigKeys.PARTICLE_MODULE: 'mi.dataset.parser.dosta_abcdjm_cspp',
                DataSetDriverConfigKeys.PARTICLE_CLASS: None,
                DataSetDriverConfigKeys.PARTICLE_CLASSES_DICT: {
                    METADATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppMetadataTelemeteredDataParticle,
                    DATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppInstrumentTelemeteredDataParticle,
                }
            },
        }
        # Define test data particles and their associated timestamps which will be
        # compared with returned results

        self.file_ingested_value = None
        self.state_callback_value = None
        self.publish_callback_value = None
        self.exception_callback_value = None

    def particle_to_yml(self, particles, filename, mode='w'):
        """
        This is added as a testing helper, not actually as part of the parser tests. Since the same particles
        will be used for the driver test it is helpful to write them to .yml in the same form they need in the
        results.yml fids here.
        """
        # open write append, if you want to start from scratch manually delete this fid
        fid = open(os.path.join(RESOURCE_PATH, filename), mode)

        fid.write('header:\n')
        fid.write("    particle_object: 'MULTIPLE'\n")
        fid.write("    particle_type: 'MULTIPLE'\n")
        fid.write('data:\n')

        for i in range(0, len(particles)):
            particle_dict = particles[i].generate_dict()

            fid.write('  - _index: %d\n' %(i+1))

            fid.write('    particle_object: %s\n' % particles[i].__class__.__name__)
            fid.write('    particle_type: %s\n' % particle_dict.get('stream_name'))
            fid.write('    internal_timestamp: %f\n' % particle_dict.get('internal_timestamp'))

            for val in particle_dict.get('values'):
                if isinstance(val.get('value'), float):
                    fid.write('    %s: %16.6f\n' % (val.get('value_id'), val.get('value')))
                elif isinstance(val.get('value'), str):
                    fid.write("    %s: '%s'\n" % (val.get('value_id'), val.get('value')))
                else:
                    fid.write('    %s: %s\n' % (val.get('value_id'), val.get('value')))
        fid.close()

    def create_yml(self):
        """
        This utility creates a yml file
        """

        fid = open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'))
        test_buffer = fid.read()
        fid.close()

        self.stream_handle = StringIO(test_buffer)
        self.parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED),
                                            None, self.stream_handle,
                                            self.state_callback, self.pub_callback,
                                            self.exception_callback)

        particles = self.parser.get_records(20)

        log.info("Exception callback value: %s", self.exception_callback_value)

        self.particle_to_yml(particles, '11194982_PPD_OPT.yml')

    def test_simple(self):
#.........这里部分代码省略.........
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:101,代码来源:test_dosta_abcdjm_cspp.py


示例20: test_state_reset

    def test_state_reset(self):
        """
        This test makes sure that we retrieve the correct particles upon resetting the state to a prior position.
        """
        expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')

        file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
        stream_handle = open(file_path, 'rb')

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       None, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("Num particles: %s", len(particles))

        self.assertTrue(len(particles) == 1)

        log.info("11111111 Read State: %s", parser._read_state)
        log.info("11111111 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][0], particles[0])

        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("22222222 Read State: %s", parser._read_state)
        log.info("22222222 State: %s", parser._state)

        self.assertTrue(len(particles) == 1)

        self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][1], particles[0])

        new_state = copy.copy(parser._state)

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        particles = parser.get_records(1)

        log.info("33333333 Read State: %s", parser._read_state)
        log.info("33333333 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 425, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][2], particles[0])

        new_state = {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True}

        parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
                                       new_state, stream_handle,
                                       self.state_callback, self.pub_callback,
                                       self.exception_callback)

        # Now retrieve two particles.  We should end up with the metadata and first data record
        particles = parser.get_records(1)

        log.info("44444444 Read State: %s", parser._read_state)
        log.info("44444444 State: %s", parser._state)

        self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})

        self.assert_result(expected_results['data'][1], particles[0])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:77,代码来源:test_dosta_abcdjm_cspp.py



注:本文中的mi.dataset.parser.dosta_abcdjm_cspp.DostaAbcdjmCsppParser类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dosta_ln_wfp.DostaLnWfpParser类代码示例发布时间:2022-05-27
下一篇:
Python ctdmo.CtdmoParser类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap