• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python util.get_mountpoint函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中qiita_db.util.get_mountpoint函数的典型用法代码示例。如果您正苦于以下问题:Python get_mountpoint函数的具体用法?Python get_mountpoint怎么用?Python get_mountpoint使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_mountpoint函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_process_target_gene_cmd

    def test_get_process_target_gene_cmd(self):
        preprocessed_data = PreprocessedData(1)
        params = ProcessedSortmernaParams(1)

        obs_cmd, obs_output_dir = _get_process_target_gene_cmd(
            preprocessed_data, params)

        _, ref_dir = get_mountpoint('reference')[0]
        _, preprocessed_dir = get_mountpoint('preprocessed_data')[0]

        exp_cmd = ("pick_closed_reference_otus.py -i {}1_seqs.fna -r "
                   "{}GreenGenes_13_8_97_otus.fasta -o {} -p placeholder -t "
                   "{}GreenGenes_13_8_97_otu_taxonomy.txt".format(
                       preprocessed_dir, ref_dir, obs_output_dir, ref_dir))

        obs_tokens = obs_cmd.split()[::-1]
        exp_tokens = exp_cmd.split()[::-1]
        self.assertEqual(len(obs_tokens), len(exp_tokens))
        while obs_tokens:
            o_t = obs_tokens.pop()
            e_t = exp_tokens.pop()
            if o_t == '-p':
                # skip parameters file
                obs_tokens.pop()
                exp_tokens.pop()
            else:
                self.assertEqual(o_t, e_t)
开发者ID:RNAer,项目名称:qiita,代码行数:27,代码来源:test_processing_pipeline.py


示例2: test_prep_template_filepaths_get_req

 def test_prep_template_filepaths_get_req(self):
     obs = prep_template_filepaths_get_req(1, '[email protected]')
     exp = {'status': 'success',
            'message': '',
            'filepaths': [
                (19, join(get_mountpoint('templates')[0][1],
                          '1_prep_1_qiime_19700101-000000.txt')),
                (18, join(get_mountpoint('templates')[0][1],
                          '1_prep_1_19700101-000000.txt'))]}
     self.assertEqual(obs, exp)
开发者ID:yimsea,项目名称:qiita,代码行数:10,代码来源:test_prep_template.py


示例3: tearDown

    def tearDown(self):
        with open(self.biom_fp, 'w') as f:
                f.write("")
        with open(self.map_fp, 'w') as f:
                f.write("")

        fp = join(get_mountpoint('analysis')[0][1], 'testfile.txt')
        if exists(fp):
            remove(fp)

        mp = get_mountpoint("processed_data")[0][1]
        study2fp = join(mp, "2_2_study_1001_closed_reference_otu_table.biom")
        if exists(study2fp):
            move(study2fp,
                 join(mp, "2_study_1001_closed_reference_otu_table.biom"))
开发者ID:RNAer,项目名称:qiita,代码行数:15,代码来源:test_analysis.py


示例4: get_filepaths

    def get_filepaths(self, conn_handler=None):
        r"""Retrieves the list of (filepath_id, filepath)"""
        # Check that this function has been called from a subclass
        self._check_subclass()

        # Check if the connection handler has been provided. Create a new
        # one if not.
        conn_handler = conn_handler if conn_handler else SQLConnectionHandler()

        if self._table == 'required_sample_info':
            table = 'sample_template_filepath'
            column = 'study_id'
        elif self._table == 'common_prep_info':
            table = 'prep_template_filepath'
            column = 'prep_template_id'
        else:
            raise QiitaDBNotImplementedError(
                'get_filepath for %s' % self._table)

        try:
            filepath_ids = conn_handler.execute_fetchall(
                "SELECT filepath_id, filepath FROM qiita.filepath WHERE "
                "filepath_id IN (SELECT filepath_id FROM qiita.{0} WHERE "
                "{1}=%s) ORDER BY filepath_id DESC".format(table, column),
                (self.id, ))
        except Exception as e:
            LogEntry.create('Runtime', str(e),
                            info={self.__class__.__name__: self.id})
            raise e

        _, fb = get_mountpoint('templates', conn_handler)[0]
        base_fp = partial(join, fb)

        return [(fpid, base_fp(fp)) for fpid, fp in filepath_ids]
开发者ID:RNAer,项目名称:qiita,代码行数:34,代码来源:base_metadata_template.py


示例5: create_from_scratch

    def create_from_scratch(self, prep_template, study_id):
        raw_data_filetype = self.get_argument("filetype")
        barcodes_str = self.get_argument("barcodes")
        forward_reads_str = self.get_argument("forward")
        sff_str = self.get_argument("sff")
        fasta_str = self.get_argument("fasta")
        qual_str = self.get_argument("qual")
        reverse_reads_str = self.get_argument("reverse")

        def _split(x):
            return x.split(",") if x else []

        filepaths, fps = [], []
        fps.append((_split(barcodes_str), "raw_barcodes"))
        fps.append((_split(fasta_str), "raw_fasta"))
        fps.append((_split(qual_str), "raw_qual"))
        fps.append((_split(forward_reads_str), "raw_forward_seqs"))
        fps.append((_split(reverse_reads_str), "raw_reverse_seqs"))
        fps.append((_split(sff_str), "raw_sff"))

        # We need to retrieve the full path for all the files, as the
        # arguments only contain the file name. Since we don't know in which
        # mountpoint the data lives, we retrieve all of them and we loop
        # through all the files checking if they exist or not.
        for _, f in get_mountpoint("uploads", retrieve_all=True):
            f = join(f, str(study_id))
            for fp_set, filetype in fps:
                for t in fp_set:
                    ft = join(f, t)
                    if exists(ft):
                        filepaths.append((ft, filetype))

        return submit(self.current_user.id, create_raw_data, raw_data_filetype, prep_template, filepaths)
开发者ID:mivamo1214,项目名称:qiita,代码行数:33,代码来源:compute.py


示例6: test_post_valid

    def test_post_valid(self):
        dontcare, uploads_dir = get_mountpoint('uploads')[0]
        foo_fp = os.path.join(uploads_dir, '1', 'foo.txt')
        bar_fp = os.path.join(uploads_dir, '1', 'bar.txt')
        with open(foo_fp, 'w') as fp:
            fp.write("@x\nATGC\n+\nHHHH\n")
        with open(bar_fp, 'w') as fp:
            fp.write("@x\nATGC\n+\nHHHH\n")

        prep = StringIO(EXP_PREP_TEMPLATE.format(1))
        prep_table = load_template_to_dataframe(prep)

        response = self.post('/api/v1/study/1/preparation?data_type=16S',
                             data=prep_table.T.to_dict(),
                             headers=self.headers, asjson=True)
        prepid = json_decode(response.body)['id']

        uri = '/api/v1/study/1/preparation/%d/artifact' % prepid
        # 1 -> fwd or rev sequences in fastq
        # 3 -> barcodes
        body = {'artifact_type': 'FASTQ', 'filepaths': [['foo.txt', 1],
                                                        ['bar.txt',
                                                         'raw_barcodes']],
                'artifact_name': 'a name is a name'}

        response = self.post(uri, data=body, headers=self.headers, asjson=True)
        self.assertEqual(response.code, 201)
        obs = json_decode(response.body)['id']

        prep_instance = PrepTemplate(prepid)
        exp = prep_instance.artifact.id
        self.assertEqual(obs, exp)
开发者ID:antgonza,项目名称:qiita,代码行数:32,代码来源:test_study_preparation.py


示例7: setUp

    def setUp(self):
        fd, self.seqs_fp = mkstemp(suffix='_seqs.fastq')
        close(fd)
        fd, self.barcodes_fp = mkstemp(suffix='_barcodes.fastq')
        close(fd)
        self.filetype = 2
        self.filepaths = [(self.seqs_fp, 1), (self.barcodes_fp, 2)]
        self.studies = [Study(1)]
        _, self.db_test_raw_dir = get_mountpoint('raw_data')[0]

        with open(self.seqs_fp, "w") as f:
            f.write("\n")
        with open(self.barcodes_fp, "w") as f:
            f.write("\n")
        self._clean_up_files = []

        # Create a new study
        info = {
            "timeseries_type_id": 1,
            "metadata_complete": True,
            "mixs_compliant": True,
            "number_samples_collected": 25,
            "number_samples_promised": 28,
            "portal_type_id": 3,
            "study_alias": "FCM",
            "study_description": "Microbiome of people who eat nothing but "
                                 "fried chicken",
            "study_abstract": "Exploring how a high fat diet changes the "
                              "gut microbiome",
            "emp_person_id": StudyPerson(2),
            "principal_investigator_id": StudyPerson(3),
            "lab_person_id": StudyPerson(1)
        }
        Study.create(User("[email protected]"), "Test study 2", [1], info)
开发者ID:zonca,项目名称:qiita,代码行数:34,代码来源:test_data.py


示例8: _common_purge_filpeaths_test

    def _common_purge_filpeaths_test(self):
        # Get all the filepaths so we can test if they've been removed or not
        sql_fp = "SELECT filepath, data_directory_id FROM qiita.filepath"
        fps = [join(get_mountpoint_path_by_id(dd_id), fp) for fp, dd_id in self.conn_handler.execute_fetchall(sql_fp)]

        # Make sure that the files exist - specially for travis
        for fp in fps:
            if not exists(fp):
                with open(fp, "w") as f:
                    f.write("\n")
                self.files_to_remove.append(fp)

        _, raw_data_mp = get_mountpoint("raw_data")[0]

        removed_fps = [join(raw_data_mp, "2_sequences_barcodes.fastq.gz"), join(raw_data_mp, "2_sequences.fastq.gz")]

        for fp in removed_fps:
            with open(fp, "w") as f:
                f.write("\n")

        sql = """INSERT INTO qiita.filepath
                    (filepath, filepath_type_id, checksum,
                     checksum_algorithm_id, data_directory_id)
                VALUES ('2_sequences_barcodes.fastq.gz', 3, '852952723', 1, 5),
                       ('2_sequences.fastq.gz', 1, '852952723', 1, 5)
                RETURNING filepath_id"""
        fp_ids = self.conn_handler.execute_fetchall(sql)

        fps = set(fps).difference(removed_fps)

        # Check that the files exist
        for fp in fps:
            self.assertTrue(exists(fp))
        for fp in removed_fps:
            self.assertTrue(exists(fp))

        exp_count = get_count("qiita.filepath") - 2

        purge_filepaths()

        obs_count = get_count("qiita.filepath")

        # Check that only 2 rows have been removed
        self.assertEqual(obs_count, exp_count)

        # Check that the 2 rows that have been removed are the correct ones
        sql = """SELECT EXISTS(
                    SELECT * FROM qiita.filepath WHERE filepath_id = %s)"""
        obs = self.conn_handler.execute_fetchone(sql, (fp_ids[0][0],))[0]
        self.assertFalse(obs)
        obs = self.conn_handler.execute_fetchone(sql, (fp_ids[1][0],))[0]
        self.assertFalse(obs)

        # Check that the files have been successfully removed
        for fp in removed_fps:
            self.assertFalse(exists(fp))

        # Check that all the other files still exist
        for fp in fps:
            self.assertTrue(exists(fp))
开发者ID:DarcyMyers,项目名称:qiita,代码行数:60,代码来源:test_util.py


示例9: test_move_upload_files_to_trash

    def test_move_upload_files_to_trash(self):
        test_filename = "this_is_a_test_file.txt"

        # create file to move to trash
        fid, folder = get_mountpoint("uploads")[0]
        test_fp = join(folder, "1", test_filename)
        with open(test_fp, "w") as f:
            f.write("test")

        self.files_to_remove.append(test_fp)

        exp = [(fid, "this_is_a_test_file.txt"), (fid, "uploaded_file.txt")]
        obs = get_files_from_uploads_folders("1")
        self.assertItemsEqual(obs, exp)

        # move file
        move_upload_files_to_trash(1, [(fid, test_filename)])
        exp = [(fid, "uploaded_file.txt")]
        obs = get_files_from_uploads_folders("1")
        self.assertItemsEqual(obs, exp)

        # testing errors
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(2, [(fid, test_filename)])
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(1, [(10, test_filename)])
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(1, [(fid, test_filename)])

        # removing trash folder
        rmtree(join(folder, "1", "trash"))
开发者ID:DarcyMyers,项目名称:qiita,代码行数:31,代码来源:test_util.py


示例10: check_fp

def check_fp(study_id, filename):
    """Check whether an uploaded file exists

    Parameters
    ----------
    study_id : int
        Study file uploaded to
    filename : str
        name of the uploaded file

    Returns
    -------
    dict
        {'status': status,
         'message': msg,
         'file': str}
        file contains full filepath if status is success, otherwise it contains
        the filename
    """
    # Get the uploads folder
    _, base_fp = get_mountpoint("uploads")[0]
    # Get the path of the sample template in the uploads folder
    fp_rsp = join(base_fp, str(study_id), filename)

    if not exists(fp_rsp):
        # The file does not exist, fail nicely
        return {'status': 'error',
                'message': 'file does not exist',
                'file': filename}
    return {'status': 'success',
            'message': '',
            'file': fp_rsp}
开发者ID:ElDeveloper,项目名称:qiita,代码行数:32,代码来源:util.py


示例11: setUp

    def setUp(self):
        fd, self.seqs_fp = mkstemp(suffix='_seqs.fastq')
        close(fd)
        fd, self.barcodes_fp = mkstemp(suffix='_barcodes.fastq')
        close(fd)
        self.filetype = 2
        self.filepaths = [(self.seqs_fp, 1), (self.barcodes_fp, 2)]
        _, self.db_test_raw_dir = get_mountpoint('raw_data')[0]

        with open(self.seqs_fp, "w") as f:
            f.write("\n")
        with open(self.barcodes_fp, "w") as f:
            f.write("\n")
        self._clean_up_files = []

        # Create some new PrepTemplates
        metadata_dict = {
            'SKB8.640193': {'center_name': 'ANL',
                            'primer': 'GTGCCAGCMGCCGCGGTAA',
                            'barcode': 'GTCCGCAAGTTA',
                            'run_prefix': "s_G1_L001_sequences",
                            'platform': 'ILLUMINA',
                            'library_construction_protocol': 'AAAA',
                            'experiment_design_description': 'BBBB'}}
        metadata = pd.DataFrame.from_dict(metadata_dict, orient='index')
        self.pt1 = PrepTemplate.create(metadata, Study(1), "16S")
        self.pt2 = PrepTemplate.create(metadata, Study(1), "18S")
        self.prep_templates = [self.pt1, self.pt2]
开发者ID:MarkBruns,项目名称:qiita,代码行数:28,代码来源:test_data.py


示例12: post

    def post(self, study_id, prep_id):
        study = self.safe_get_study(study_id)
        if study is None:
            return

        prep_id = to_int(prep_id)
        try:
            p = PrepTemplate(prep_id)
        except QiitaDBUnknownIDError:
            self.fail('Preparation not found', 404)
            return

        if p.study_id != study.id:
            self.fail('Preparation ID not associated with the study', 409)
            return

        artifact_deets = json_decode(self.request.body)
        _, upload = get_mountpoint('uploads')[0]
        base = os.path.join(upload, study_id)
        filepaths = [(os.path.join(base, fp), fp_type)
                     for fp, fp_type in artifact_deets['filepaths']]

        try:
            art = Artifact.create(filepaths,
                                  artifact_deets['artifact_type'],
                                  artifact_deets['artifact_name'],
                                  p)
        except QiitaError as e:
            self.fail(str(e), 406)
            return

        self.write({'id': art.id})
        self.set_status(201)
        self.finish()
开发者ID:antgonza,项目名称:qiita,代码行数:34,代码来源:study_preparation.py


示例13: test_move_upload_files_to_trash

    def test_move_upload_files_to_trash(self):
        test_filename = 'this_is_a_test_file.txt'

        # create file to move to trash
        fid, folder = get_mountpoint("uploads")[0]
        open(join(folder, '1', test_filename), 'w').write('test')

        exp = [(fid, 'this_is_a_test_file.txt'), (fid, 'uploaded_file.txt')]
        obs = get_files_from_uploads_folders("1")
        self.assertItemsEqual(obs, exp)

        # move file
        move_upload_files_to_trash(1, [(fid, test_filename)])
        exp = [(fid, 'uploaded_file.txt')]
        obs = get_files_from_uploads_folders("1")
        self.assertItemsEqual(obs, exp)

        # testing errors
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(2, [(fid, test_filename)])
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(1, [(10, test_filename)])
        with self.assertRaises(QiitaDBError):
            move_upload_files_to_trash(1, [(fid, test_filename)])

        # removing trash folder
        rmtree(join(folder, '1', 'trash'))
开发者ID:jwdebelius,项目名称:qiita,代码行数:27,代码来源:test_util.py


示例14: test_build_mapping_file

    def test_build_mapping_file(self):
        new_id = get_count('qiita.filepath') + 1
        samples = {1: ['1.SKB8.640193', '1.SKD8.640184', '1.SKB7.640196']}
        self.analysis._build_mapping_file(samples)
        obs = self.analysis.mapping_file
        self.assertEqual(obs, self.map_fp)

        base_dir = get_mountpoint('analysis')[0][1]
        obs = pd.read_csv(obs, sep='\t', infer_datetime_format=True,
                          parse_dates=True, index_col=False, comment='\t')
        exp = pd.read_csv(join(base_dir, '1_analysis_mapping_exp.txt'),
                          sep='\t', infer_datetime_format=True,
                          parse_dates=True, index_col=False, comment='\t')

        assert_frame_equal(obs, exp)

        sql = """SELECT * FROM qiita.filepath
                 WHERE filepath=%s ORDER BY filepath_id"""
        obs = self.conn_handler.execute_fetchall(
            sql, ("%d_analysis_mapping.txt" % self.analysis.id,))

        exp = [[13, '1_analysis_mapping.txt', 9, '852952723', 1, 1],
               [new_id, '1_analysis_mapping.txt', 9, '1606265094', 1, 1]]
        self.assertEqual(obs, exp)

        sql = """SELECT * FROM qiita.analysis_filepath
                 WHERE analysis_id=%s ORDER BY filepath_id"""
        obs = self.conn_handler.execute_fetchall(sql, (self.analysis.id,))
        exp = [[1L, 14L, 2L], [1L, 15L, None], [1L, new_id, None]]
开发者ID:MarkBruns,项目名称:qiita,代码行数:29,代码来源:test_analysis.py


示例15: get_filepaths

    def get_filepaths(self):
        r"""Retrieves the list of (filepath_id, filepath)"""
        # Check that this function has been called from a subclass
        self._check_subclass()

        # Check if the connection handler has been provided. Create a new
        # one if not.
        conn_handler = SQLConnectionHandler()

        try:
            filepath_ids = conn_handler.execute_fetchall(
                "SELECT filepath_id, filepath FROM qiita.filepath WHERE "
                "filepath_id IN (SELECT filepath_id FROM qiita.{0} WHERE "
                "{1}=%s) ORDER BY filepath_id DESC".format(
                    self._filepath_table, self._id_column),
                (self.id, ))
        except Exception as e:
            LogEntry.create('Runtime', str(e),
                            info={self.__class__.__name__: self.id})
            raise e

        _, fb = get_mountpoint('templates')[0]
        base_fp = partial(join, fb)

        return [(fpid, base_fp(fp)) for fpid, fp in filepath_ids]
开发者ID:mortonjt,项目名称:qiita,代码行数:25,代码来源:base_metadata_template.py


示例16: setUp

    def setUp(self):
        # Create a directory with the test split libraries output
        self.test_slo = mkdtemp(prefix='test_slo_')
        path_builder = partial(join, self.test_slo)
        fna_fp = path_builder('seqs.fna')
        fastq_fp = path_builder('seqs.fastq')
        log_fp = path_builder('split_library_log.txt')
        demux_fp = path_builder('seqs.demux')

        with open(fna_fp, 'w') as f:
            f.write(FASTA_SEQS)

        with open(fastq_fp, 'w') as f:
            f.write(FASTQ_SEQS)

        with open(log_fp, 'w') as f:
            f.write("Test log\n")

        generate_demux_file(self.test_slo)

        self._filepaths_to_remove = [fna_fp, fastq_fp, demux_fp, log_fp]
        self._dirpaths_to_remove = [self.test_slo]

        # Generate a directory with test split libraries output missing files
        self.missing_slo = mkdtemp(prefix='test_missing_')
        path_builder = partial(join, self.test_slo)
        fna_fp = path_builder('seqs.fna')
        fastq_fp = path_builder('seqs.fastq')

        with open(fna_fp, 'w') as f:
            f.write(FASTA_SEQS)

        with open(fastq_fp, 'w') as f:
            f.write(FASTQ_SEQS)

        self._filepaths_to_remove.append(fna_fp)
        self._filepaths_to_remove.append(fastq_fp)
        self._dirpaths_to_remove.append(self.missing_slo)

        # Create a study with no preprocessed data
        info = {
            "timeseries_type_id": 1,
            "metadata_complete": True,
            "mixs_compliant": True,
            "number_samples_collected": 25,
            "number_samples_promised": 28,
            "study_alias": "FCM",
            "study_description": "Microbiome of people who eat nothing but "
                                 "fried chicken",
            "study_abstract": "Exploring how a high fat diet changes the "
                              "gut microbiome",
            "emp_person_id": StudyPerson(2),
            "principal_investigator_id": StudyPerson(3),
            "lab_person_id": StudyPerson(1)
        }
        self.no_ppd_study = Study.create(
            User('[email protected]'), "Test study", [1], info)

        # Get the directory where the preprocessed data is usually copied.
        _, self.db_ppd_dir = get_mountpoint('preprocessed_data')[0]
开发者ID:MarkBruns,项目名称:qiita,代码行数:60,代码来源:test_commands.py


示例17: test_move_filepaths_to_upload_folder

    def test_move_filepaths_to_upload_folder(self):
        # setting up test, done here as this is the only test that uses these
        # files
        fd, seqs_fp = mkstemp(suffix='_seqs.fastq')
        close(fd)
        study_id = 1

        rd = RawData.create(2, [Study(study_id)], [(seqs_fp, 1)])
        filepaths = rd.get_filepaths()
        # deleting reference so we can directly call
        # move_filepaths_to_upload_folder
        for fid, _, _ in filepaths:
            self.conn_handler.execute(
                "DELETE FROM qiita.raw_filepath WHERE filepath_id=%s", (fid,))

        # moving filepaths
        move_filepaths_to_upload_folder(study_id, filepaths, self.conn_handler)

        # check that they do not exist in the old path but do in the new one
        path_for_removal = join(get_mountpoint("uploads")[0][1], str(study_id))
        for _, fp, _ in filepaths:
            self.assertFalse(exists(fp))
            new_fp = join(path_for_removal, basename(fp).split('_', 1)[1])
            self.assertTrue(exists(new_fp))

            self.files_to_remove.append(new_fp)
开发者ID:jwdebelius,项目名称:qiita,代码行数:26,代码来源:test_util.py


示例18: test_check_fp

 def test_check_fp(self):
     obs = check_fp(1, 'uploaded_file.txt')
     _, base_fp = get_mountpoint("uploads")[0]
     exp = {'status': 'success',
            'message': '',
            'file': join(base_fp, '1', 'uploaded_file.txt')}
     self.assertEqual(obs, exp)
开发者ID:ElDeveloper,项目名称:qiita,代码行数:7,代码来源:test_util.py


示例19: update_prep_template

    def update_prep_template(self, study, user, callback):
        """Update a prep template from the POST method

        Parameters
        ----------
        study : Study
            The current study object
        user : User
            The current user object
        callback : function
            The callback function to call with the results once the processing
            is done

        Raises
        ------
        HTTPError
            If the prep template file does not exists
        """
        # If we are on this function, the arguments "prep_template_id",
        # "update_prep_template_file" must defined. If not, let tornado
        # raise its error
        pt_id = int(self.get_argument('prep_template_id'))
        prep_template = self.get_argument('update_prep_template_file')

        # Define here the message and message level in case of success
        msg = "The prep template '%s' has been updated" % prep_template
        msg_level = "success"
        # Get the uploads folder
        _, base_fp = get_mountpoint("uploads")[0]
        # Get the path of the prep template in the uploads folder
        fp = join(base_fp, str(study.id), prep_template)

        if not exists(fp):
            # The file does not exist, fail nicely
            # Using 400 because we want the user to get the error in the GUI
            raise HTTPError(400, "This file doesn't exist: %s" % fp)
        try:
            with warnings.catch_warnings(record=True) as warns:
                pt = PrepTemplate(pt_id)
                pt.update(load_template_to_dataframe(fp))
                remove(fp)

                # join all the warning messages into one. Note that this info
                # will be ignored if an exception is raised
                if warns:
                    msg = '; '.join([str(w.message) for w in warns])
                    msg_level = 'warning'

        except (TypeError, QiitaDBColumnError, QiitaDBExecutionError,
                QiitaDBDuplicateError, IOError, ValueError, KeyError,
                CParserError, QiitaDBDuplicateHeaderError, QiitaDBError) as e:
            # Some error occurred while processing the sample template
            # Show the error to the user so they can fix the template
            msg = html_error_message % ('updating the prep template:',
                                        basename(fp), str(e))
            msg = convert_text_html(msg)
            msg_level = "danger"

        callback((msg, msg_level, 'prep_template_tab', pt_id, None))
开发者ID:MarkBruns,项目名称:qiita,代码行数:59,代码来源:description_handlers.py


示例20: setUp

    def setUp(self):
        uploads_path = get_mountpoint('uploads')[0][1]
        # Create prep test file to point at
        self.update_fp = join(uploads_path, '1', 'update.txt')
        with open(self.update_fp, 'w') as f:
            f.write("""sample_name\tnew_col\n1.SKD6.640190\tnew_value\n""")

        self._files_to_remove = [self.update_fp]
开发者ID:yimsea,项目名称:qiita,代码行数:8,代码来源:test_artifact.py



注:本文中的qiita_db.util.get_mountpoint函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.check_access函数代码示例发布时间:2022-05-26
下一篇:
Python util.get_files_from_uploads_folders函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap