• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python util.get_db_files_base_dir函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中qiita_db.util.get_db_files_base_dir函数的典型用法代码示例。如果您正苦于以下问题:Python get_db_files_base_dir函数的具体用法?Python get_db_files_base_dir怎么用?Python get_db_files_base_dir使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_db_files_base_dir函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_delete_files

    def test_delete_files(self):
        try:
            Job.delete(1)
            with self.assertRaises(QiitaDBUnknownIDError):
                Job(1)

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.filepath WHERE filepath_id = 11 OR "
                "filepath_id = 15")
            self.assertEqual(obs, [])

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")
            self.assertEqual(obs, [])

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.analysis_job WHERE job_id = 1")
            self.assertEqual(obs, [])

            self.assertFalse(exists(join(get_db_files_base_dir(),
                                    "job/1_job_result.txt")))
        finally:
            if not exists(join(get_db_files_base_dir(),
                          "job/1_job_result.txt")):
                with open(join(get_db_files_base_dir(),
                          "job/1_job_result.txt"), 'w') as f:
                    f.write("job1result.txt")
开发者ID:Jorge-C,项目名称:qiita,代码行数:27,代码来源:test_job.py


示例2: test_delete_folders

    def test_delete_folders(self):
        try:
            Job.delete(2)
            with self.assertRaises(QiitaDBUnknownIDError):
                Job(2)

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.filepath WHERE filepath_id = 12")
            self.assertEqual(obs, [])

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.job_results_filepath WHERE job_id = 2")
            self.assertEqual(obs, [])

            obs = self.conn_handler.execute_fetchall(
                "SELECT * FROM qiita.analysis_job WHERE job_id = 2")
            self.assertEqual(obs, [])

            self.assertFalse(exists(join(get_db_files_base_dir(),
                                    "job/2_test_folder")))
        finally:
            # put the test data back
            basedir = get_db_files_base_dir()
            if not exists(join(basedir, "job/2_test_folder")):
                mkdir(join(basedir, "job", "2_test_folder"))
                mkdir(join(basedir, "job", "2_test_folder", "subdir"))
                with open(join(basedir, "job", "2_test_folder",
                               "testfile.txt"), 'w') as f:
                    f.write("DATA")
                with open(join(basedir, "job", "2_test_folder",
                               "testres.htm"), 'w') as f:
                    f.write("DATA")
                with open(join(basedir, "job", "2_test_folder",
                               "subdir", "subres.html"), 'w') as f:
                    f.write("DATA")
开发者ID:Jorge-C,项目名称:qiita,代码行数:35,代码来源:test_job.py


示例3: post

    def post(self, analysis_id):
        command_args = self.get_arguments("commands")
        split = [x.split("#") for x in command_args]
        analysis = Analysis(analysis_id)

        commands = []
        # HARD CODED HACKY THING FOR DEMO, FIX  Issue #164
        fp, mapping_file = mkstemp(suffix="_map_file.txt")
        close(fp)
        SampleTemplate(1).to_file(mapping_file)
        study_fps = {}
        for pd in Study(1).processed_data:
            processed = ProcessedData(pd)
            study_fps[processed.data_type] = processed.get_filepaths()[0][0]
        for data_type, command in split:
            opts = {
                "--otu_table_fp": study_fps[data_type],
                "--mapping_fp": mapping_file
            }
            if command == "Beta Diversity" and data_type in {'16S', '18S'}:
                opts["--tree_fp"] = join(get_db_files_base_dir(), "reference",
                                         "gg_97_otus_4feb2011.tre")
            elif command == "Beta Diversity":
                opts["--parameter_fp"] = join(get_db_files_base_dir(),
                                              "reference", "params_qiime.txt")
            Job.create(data_type, command, opts, analysis)
            commands.append("%s: %s" % (data_type, command))
        user = self.get_current_user()
        self.render("analysis_waiting.html", user=user,
                    aid=analysis_id, aname=analysis.name,
                    commands=commands)
        # fire off analysis run here
        # currently synch run so redirect done here. Will remove after demo
        run_analysis(user, analysis)
开发者ID:teravest,项目名称:qiita,代码行数:34,代码来源:analysis_handlers.py


示例4: test_purge_filepaths

    def test_purge_filepaths(self):
        # Add a new filepath to the database
        fd, fp = mkstemp()
        close(fd)
        fp_id = self.conn_handler.execute_fetchone(
            "INSERT INTO qiita.filepath "
            "(filepath, filepath_type_id, checksum, checksum_algorithm_id) "
            "VALUES (%s, %s, %s, %s) RETURNING filepath_id", (fp, 1, "", 1))[0]
        self.assertEqual(fp_id, 17)

        # Connect the just added filepath to a raw data
        self.conn_handler.execute(
            "INSERT INTO qiita.raw_filepath (raw_data_id, filepath_id) VALUES"
            "(%s, %s)", (1, 17))

        # Get the filepaths so we can test if they've been removed or not
        sql_fp = "SELECT filepath FROM qiita.filepath WHERE filepath_id=%s"
        fp1 = self.conn_handler.execute_fetchone(sql_fp, (1,))[0]
        fp1 = join(get_db_files_base_dir(), fp1)

        # Make sure that the file exists - specially for travis
        with open(fp1, 'w') as f:
            f.write('\n')

        fp17 = self.conn_handler.execute_fetchone(sql_fp, (17,))[0]
        fp17 = join(get_db_files_base_dir(), fp17)

        # Nothing should be removed
        purge_filepaths(self.conn_handler)

        sql_ids = ("SELECT filepath_id FROM qiita.filepath ORDER BY "
                   "filepath_id")
        obs = self.conn_handler.execute_fetchall(sql_ids)
        exp = [[1], [2], [3], [4], [5], [6], [7], [8], [9],
               [10], [11], [12], [13], [14], [15], [17]]
        self.assertEqual(obs, exp)

        # Check that the files still exist
        self.assertTrue(exists(fp1))
        self.assertTrue(exists(fp17))

        # Unlink the filepath from the raw data
        self.conn_handler.execute(
            "DELETE FROM qiita.raw_filepath WHERE filepath_id=%s", (17,))

        # Only filepath 16 should be removed
        purge_filepaths(self.conn_handler)

        obs = self.conn_handler.execute_fetchall(sql_ids)
        exp = [[1], [2], [3], [4], [5], [6], [7], [8], [9],
               [10], [11], [12], [13], [14], [15]]
        self.assertEqual(obs, exp)

        # Check that only the file for the removed filepath has been removed
        self.assertTrue(exists(fp1))
        self.assertFalse(exists(fp17))
开发者ID:BrindhaBioinfo,项目名称:qiita,代码行数:56,代码来源:test_util.py


示例5: test_build_files

 def test_build_files(self):
     biom_fp = join(get_db_files_base_dir(), "analysis",
                    "1_analysis_18S.biom")
     map_fp = join(get_db_files_base_dir(), "analysis",
                   "1_analysis_mapping.txt")
     try:
         self.analysis.build_files()
     finally:
         with open(biom_fp, 'w') as f:
             f.write("")
         with open(map_fp, 'w') as f:
             f.write("")
开发者ID:Jorge-C,项目名称:qiita,代码行数:12,代码来源:test_analysis.py


示例6: test_set_options

 def test_set_options(self):
     new = Job.create("18S", "Alpha Rarefaction", {"opt1": 4}, Analysis(1))
     new.options = self.options
     self.options['--output_dir'] = join(get_db_files_base_dir(),
                                         'job/4_alpha_rarefaction.'
                                         'py_output_dir')
     self.assertEqual(new.options, self.options)
开发者ID:Jorge-C,项目名称:qiita,代码行数:7,代码来源:test_job.py


示例7: get

    def get(self, analysis_id):
        analysis_id = int(analysis_id.split("/")[0])
        analysis = Analysis(analysis_id)
        check_analysis_access(self.current_user, analysis)

        jobres = defaultdict(list)
        for jobject in analysis.jobs:
            results = []
            for res in jobject.results:
                name = basename(res)
                if name.startswith('index'):
                    name = basename(dirname(res)).replace('_', ' ')
                results.append((res, name))
            jobres[jobject.datatype].append((jobject.command[0],
                                             results))

        dropped_samples = analysis.dropped_samples
        dropped = defaultdict(list)
        for proc_data_id, samples in viewitems(dropped_samples):
            if not samples:
                continue
            proc_data = Artifact(proc_data_id)
            data_type = proc_data.data_type
            dropped[data_type].append((proc_data.study.title, len(samples),
                                       ', '.join(samples)))

        self.render("analysis_results.html", analysis_id=analysis_id,
                    jobres=jobres, aname=analysis.name, dropped=dropped,
                    basefolder=get_db_files_base_dir())
开发者ID:yimsea,项目名称:qiita,代码行数:29,代码来源:analysis_handlers.py


示例8: _list_dir_files_nginx

    def _list_dir_files_nginx(self, dirpath):
        """Generates a nginx list of files in the given dirpath for nginx

        Parameters
        ----------
        dirpath : str
            Path to the directory

        Returns
        -------
        list of (str, str, str)
            The path information needed by nginx for each file in the
            directory
        """
        basedir = get_db_files_base_dir()
        basedir_len = len(basedir) + 1
        to_download = []
        for dp, _, fps in walk(dirpath):
            for fn in fps:
                fullpath = join(dp, fn)
                spath = fullpath
                if fullpath.startswith(basedir):
                    spath = fullpath[basedir_len:]
                to_download.append((fullpath, spath, spath))
        return to_download
开发者ID:ElDeveloper,项目名称:qiita,代码行数:25,代码来源:download.py


示例9: get

    def get(self, analysis_id):
        user = self.current_user
        analysis_id = int(analysis_id)
        check_analysis_access(User(user), analysis_id)

        analysis = Analysis(analysis_id)
        jobres = defaultdict(list)
        for job in analysis.jobs:
            jobject = Job(job)
            jobres[jobject.datatype].append((jobject.command[0],
                                             jobject.results))

        dropped = {}
        for proc_data_id, samples in viewitems(analysis.dropped_samples):
            proc_data = ProcessedData(proc_data_id)
            key = "Data type %s, Study: %s" % (proc_data.data_type(),
                                               proc_data.study)
            dropped[key] = samples

        self.render("analysis_results.html", user=self.current_user,
                    jobres=jobres, aname=analysis.name, dropped=dropped,
                    basefolder=get_db_files_base_dir())

        # wipe out cached messages for this analysis
        r_server = Redis()
        key = '%s:messages' % self.current_user
        oldmessages = r_server.lrange(key, 0, -1)
        if oldmessages is not None:
            for message in oldmessages:
                if '"analysis": %d' % analysis_id in message:
                    r_server.lrem(key, message, 1)
开发者ID:Jorge-C,项目名称:qiita,代码行数:31,代码来源:analysis_handlers.py


示例10: test_insert_filepaths_string

    def test_insert_filepaths_string(self):
        fd, fp = mkstemp()
        close(fd)
        with open(fp, "w") as f:
            f.write("\n")
        self.files_to_remove.append(fp)

        exp_new_id = 1 + self.conn_handler.execute_fetchone(
            "SELECT count(1) FROM qiita.filepath")[0]
        obs = insert_filepaths([(fp, "raw_forward_seqs")], 1, "raw_data",
                               "filepath", self.conn_handler)
        self.assertEqual(obs, [exp_new_id])

        # Check that the files have been copied correctly
        exp_fp = join(get_db_files_base_dir(), "raw_data",
                      "1_%s" % basename(fp))
        self.assertTrue(exists(exp_fp))
        self.files_to_remove.append(exp_fp)

        # Check that the filepaths have been added to the DB
        obs = self.conn_handler.execute_fetchall(
            "SELECT * FROM qiita.filepath WHERE filepath_id=%d" % exp_new_id)
        exp_fp = "1_%s" % basename(fp)
        exp = [[exp_new_id, exp_fp, 1, '852952723', 1, 5]]
        self.assertEqual(obs, exp)
开发者ID:jwdebelius,项目名称:qiita,代码行数:25,代码来源:test_util.py


示例11: get

    def get(self, analysis_id):
        analysis_id = int(analysis_id.split("/")[0])
        analysis = Analysis(analysis_id)
        check_analysis_access(self.current_user, analysis)

        jobres = defaultdict(list)
        for job in analysis.jobs:
            jobject = Job(job)
            jobres[jobject.datatype].append((jobject.command[0],
                                             jobject.results))

        dropped_samples = analysis.dropped_samples
        dropped = defaultdict(list)
        for proc_data_id, samples in viewitems(dropped_samples):
            if not samples:
                continue
            proc_data = ProcessedData(proc_data_id)
            data_type = proc_data.data_type()
            study = proc_data.study
            dropped[data_type].append((Study(study).title, len(samples),
                                       ', '.join(samples)))

        self.render("analysis_results.html", analysis_id=analysis_id,
                    jobres=jobres, aname=analysis.name, dropped=dropped,
                    basefolder=get_db_files_base_dir())
开发者ID:DarcyMyers,项目名称:qiita,代码行数:25,代码来源:analysis_handlers.py


示例12: validate_absolute_path

    def validate_absolute_path(self, root, absolute_path):
        """Overrides StaticFileHandler's method to include authentication
        """
        # Get the filename (or the base directory) of the result
        len_prefix = len(commonprefix([root, absolute_path]))
        base_requested_fp = absolute_path[len_prefix:].split(sep, 1)[0]

        current_user = self.current_user

        # If the user is an admin, then allow access
        if current_user.level == 'admin':
            return super(ResultsHandler, self).validate_absolute_path(
                root, absolute_path)

        # otherwise, we have to check if they have access to the requested
        # resource
        user_id = current_user.id
        accessible_filepaths = check_access_to_analysis_result(
            user_id, base_requested_fp)

        # Turn these filepath IDs into absolute paths
        db_files_base_dir = get_db_files_base_dir()
        relpaths = filepath_ids_to_rel_paths(accessible_filepaths)

        accessible_filepaths = {join(db_files_base_dir, relpath)
                                for relpath in relpaths.values()}

        # check if the requested resource is a file (or is in a directory) that
        # the user has access to
        if join(root, base_requested_fp) in accessible_filepaths:
            return super(ResultsHandler, self).validate_absolute_path(
                root, absolute_path)
        else:
            raise QiitaPetAuthorizationError(user_id, absolute_path)
开发者ID:zonca,项目名称:qiita,代码行数:34,代码来源:analysis_handlers.py


示例13: test_add_results

    def test_add_results(self):
        self.job.add_results([(join(get_db_files_base_dir(), "job",
                                    "1_job_result.txt"), "plain_text")])

        # make sure files attached to job properly
        obs = self.conn_handler.execute_fetchall(
            "SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")

        self.assertEqual(obs, [[1, 11], [1, 15]])
开发者ID:Jorge-C,项目名称:qiita,代码行数:9,代码来源:test_job.py


示例14: test_add_results_dir

    def test_add_results_dir(self):
        # Create a test directory
        test_dir = join(get_db_files_base_dir(), "job", "2_test_folder")

        # add folder to job
        self.job.add_results([(test_dir, "directory")])

        # make sure files attached to job properly
        obs = self.conn_handler.execute_fetchall(
            "SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")
        self.assertEqual(obs, [[1, 11], [1, 15]])
开发者ID:Jorge-C,项目名称:qiita,代码行数:11,代码来源:test_job.py


示例15: test_build_files_job_comm_wrapper

    def test_build_files_job_comm_wrapper(self):
        # basic setup needed for test
        job = Job(3)

        # create the files needed for job, testing _build_analysis_files
        analysis = Analysis(2)
        _build_analysis_files(analysis, 100)
        self._del_files.append(join(get_db_files_base_dir(), "analysis",
                                    "2_analysis_mapping.txt"))
        self._del_files.append(join(get_db_files_base_dir(), "analysis",
                                    "2_analysis_18S.biom"))
        self.assertTrue(exists(join(get_db_files_base_dir(), "analysis",
                                    "2_analysis_mapping.txt")))
        self.assertTrue(exists(join(get_db_files_base_dir(), "analysis",
                                    "2_analysis_18S.biom")))
        self.assertEqual([3], analysis.jobs)

        _job_comm_wrapper("[email protected]", 2, job)

        self.assertEqual(job.status, "error")
开发者ID:Jorge-C,项目名称:qiita,代码行数:20,代码来源:test_run.py


示例16: get

    def get(self, aid):
        analysis = Analysis(aid)
        jobres = defaultdict(list)
        for job in analysis.jobs:
            jobject = Job(job)
            jobres[jobject.datatype].append((jobject.command[0],
                                             jobject.results))

        self.render("analysis_results.html", user=self.get_current_user(),
                    jobres=jobres, aname=analysis.name,
                    basefolder=get_db_files_base_dir())
开发者ID:teravest,项目名称:qiita,代码行数:11,代码来源:analysis_handlers.py


示例17: test_retrieve_dropped_samples

 def test_retrieve_dropped_samples(self):
     biom_fp = join(get_db_files_base_dir(), "analysis",
                    "1_analysis_18S.biom")
     try:
         samples = {1: ['SKB8.640193', 'SKD8.640184', 'SKB7.640196']}
         self.analysis._build_biom_tables(samples, 100,
                                          conn_handler=self.conn_handler)
         exp = {1: {'SKM4.640180', 'SKM9.640192'}}
         self.assertEqual(self.analysis.dropped_samples, exp)
     finally:
         with open(biom_fp, 'w') as f:
             f.write("")
开发者ID:Jorge-C,项目名称:qiita,代码行数:12,代码来源:test_analysis.py


示例18: test_get_mountpoint_path_by_id

    def test_get_mountpoint_path_by_id(self):
        exp = join(get_db_files_base_dir(), "raw_data", "")
        obs = get_mountpoint_path_by_id(5)
        self.assertEqual(obs, exp)

        exp = join(get_db_files_base_dir(), "analysis", "")
        obs = get_mountpoint_path_by_id(1)
        self.assertEqual(obs, exp)

        exp = join(get_db_files_base_dir(), "job", "")
        obs = get_mountpoint_path_by_id(2)
        self.assertEqual(obs, exp)

        # inserting new ones so we can test that it retrieves these and
        # doesn't alter other ones
        self.conn_handler.execute("UPDATE qiita.data_directory SET active=false WHERE " "data_directory_id=1")
        self.conn_handler.execute(
            "INSERT INTO qiita.data_directory (data_type, mountpoint, "
            "subdirectory, active) VALUES ('analysis', 'analysis', 'tmp', "
            "true), ('raw_data', 'raw_data', 'tmp', false)"
        )

        # this should have been updated
        exp = join(get_db_files_base_dir(), "analysis", "tmp")
        obs = get_mountpoint_path_by_id(10)
        self.assertEqual(obs, exp)

        # these 2 shouldn't
        exp = join(get_db_files_base_dir(), "raw_data", "")
        obs = get_mountpoint_path_by_id(5)
        self.assertEqual(obs, exp)

        exp = join(get_db_files_base_dir(), "job", "")
        obs = get_mountpoint_path_by_id(2)
        self.assertEqual(obs, exp)
开发者ID:DarcyMyers,项目名称:qiita,代码行数:35,代码来源:test_util.py


示例19: setUp

    def setUp(self):
        self.preprocessed_data = PreprocessedData(1)
        self.params_table = "processed_params_uclust"
        self.params_id = 1
        fd, self.biom_fp = mkstemp(suffix='_table.biom')
        close(fd)
        self.filepaths = [(self.biom_fp, 6)]
        self.date = datetime(2014, 5, 29, 12, 24, 51)
        self.db_test_pd_dir = join(get_db_files_base_dir(), 'processed_data')

        with open(self.biom_fp, "w") as f:
            f.write("\n")
        self._clean_up_files = []
开发者ID:teravest,项目名称:qiita,代码行数:13,代码来源:test_data.py


示例20: test_add_jobs_in_construct_job_graphs

 def test_add_jobs_in_construct_job_graphs(self):
     analysis = Analysis(2)
     npt.assert_warns(QiitaDBWarning, analysis.build_files)
     RunAnalysis()._construct_job_graph(
         analysis, [('18S', 'Summarize Taxa')],
         comm_opts={'Summarize Taxa': {'opt1': 5}})
     self.assertEqual(analysis.jobs, [Job(3), Job(4)])
     job = Job(4)
     self.assertEqual(job.datatype, '18S')
     self.assertEqual(job.command,
                      ['Summarize Taxa', 'summarize_taxa_through_plots.py'])
     expopts = {
         '--mapping_fp': join(
             get_db_files_base_dir(), 'analysis/2_analysis_mapping.txt'),
         '--otu_table_fp': join(
             get_db_files_base_dir(),
             'analysis/2_analysis_dt-18S_r-1_c-3.biom'),
         '--output_dir': join(
             get_db_files_base_dir(), 'job',
             '4_summarize_taxa_through_plots.py_output_dir'),
         'opt1': 5}
     self.assertEqual(job.options, expopts)
开发者ID:carlyboyd,项目名称:qiita,代码行数:22,代码来源:test_analysis_pipeline.py



注:本文中的qiita_db.util.get_db_files_base_dir函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.get_files_from_uploads_folders函数代码示例发布时间:2022-05-26
下一篇:
Python util.get_count函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap