本文整理汇总了Python中skbio.util.misc.remove_files函数的典型用法代码示例。如果您正苦于以下问题:Python remove_files函数的具体用法?Python remove_files怎么用?Python remove_files使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了remove_files函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: tearDown
def tearDown(self):
if self._files_to_remove:
remove_files(self._files_to_remove)
if exists(self.output_dir):
rmtree(self.output_dir)
if exists(self.input_dir):
rmtree(self.input_dir)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:7,代码来源:test_add_qiime_labels.py
示例2: tearDown
def tearDown(self):
remove_files(set(self.files_to_remove))
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:7,代码来源:test_supervised_learning.py
示例3: tearDown
def tearDown(self):
""" """
disable_timeout()
remove_files(self.files_to_remove, error_on_missing=False)
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:9,代码来源:test_map_reads_to_reference.py
示例4: tearDown
def tearDown(self):
"""Removes temporary directories and files."""
remove_files(self.files_to_remove)
# Remove directories last, so we don't get errors trying to remove
# files which may be in the directories.
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:9,代码来源:test_compare_categories.py
示例5: tearDown
def tearDown(self):
"""Clean up tmp files."""
remove_files(self.files_to_remove, False)
if self.tmpdir:
rmtree(self.tmpdir)
# clean up the file from init_flowgram_file
if (hasattr(self, "tmp_filename") and exists(self.tmp_filename)):
remove(self.tmp_filename)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:9,代码来源:test_utils.py
示例6: tearDown
def tearDown(self):
""" """
disable_timeout()
# reset sys.stderr
sys.stderr = self.saved_stderr
remove_files(self.files_to_remove)
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:13,代码来源:test_downstream.py
示例7: remove_intermediate_files
def remove_intermediate_files(self):
"""Remove all intermediate files."""
# tmp files are written in the current dir,
# app controller always jumps into dir specified via exec_dir
# Note: blast intermediates are not removed
exec_dir = str(self.Parameters['--exec_dir'].Value)
inp_file_name = str(self.Parameters['--query_NAST'].Value)
exec_dir = exec_dir.rstrip('"')
exec_dir = exec_dir.lstrip('"')
inp_file_name = inp_file_name.rstrip('"')
inp_file_name = inp_file_name.lstrip('"')
tmp_suffixes = [".CPS", ".CPS.CPC", ".CPS_RENAST", ".CPS_RENAST.cidx",
".CPS.CPC.wTaxons", ".cidx"]
cs_tmp_files = [
exec_dir +
'/' +
inp_file_name +
x for x in tmp_suffixes]
remove_files(cs_tmp_files, error_on_missing=False)
db_param = self.Parameters['--db_NAST']
if db_param.isOn():
nast_db_name = str(db_param.Value)
nast_db_name = nast_db_name.rstrip('"')
nast_db_name = nast_db_name.lstrip('"')
# Better do not remove this file since other ChimeraSlayer
# instances running on the same ref set might use this file
# Should be rather deleted in the calling function
# remove_files([nast_db_name + ".cidx"],
# error_on_missing=False)
fasta_param = self.Parameters['--db_FASTA']
if fasta_param.isOn():
fasta_name = str(fasta_param.Value)
fasta_name = fasta_name.rstrip('"')
fasta_name = fasta_name.lstrip('"')
blast_db_files = [
fasta_name +
x for x in [
".nsq",
".nin",
".nhr",
".cidx"]]
remove_files(blast_db_files, error_on_missing=False)
开发者ID:ajaykshatriya,项目名称:qiime,代码行数:50,代码来源:identify_chimeric_seqs.py
示例8: tearDown
def tearDown(self):
"""Clean up tmp files."""
# turn off the alarm
signal.alarm(0)
remove_files(self.files_to_remove, False)
if self.server_socket:
self.server_socket.close()
# give clients time to clean up
sleep(1)
if exists(self.tmp_dir):
try:
rmdir(self.tmp_dir)
except OSError:
# give clients some more time, fail if still error
sleep(5)
rmdir(self.tmp_dir)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:18,代码来源:test_cluster_util.py
示例9: test_remove_files
def test_remove_files(self):
"""Remove files functions as expected """
# create list of temp file paths
test_fds = [NamedTemporaryFile(delete=False) for i in range(5)]
test_filepaths = [element.name for element in test_fds]
# should work just fine
remove_files(test_filepaths)
# check that an error is raised on trying to remove the files...
self.assertRaises(OSError, remove_files, test_filepaths)
# touch one of the filepaths so it exists
extra_file = NamedTemporaryFile(delete=False).name
test_filepaths.append(extra_file)
# no error is raised on trying to remove the files
# (although 5 don't exist)...
remove_files(test_filepaths, error_on_missing=False)
# ... and the existing file was removed
self.assertFalse(exists(extra_file))
# try to remove them with remove_files and verify that an IOError is
# raises
self.assertRaises(OSError, remove_files, test_filepaths)
# now get no error when error_on_missing=False
remove_files(test_filepaths, error_on_missing=False)
开发者ID:teravest,项目名称:scikit-bio,代码行数:28,代码来源:test_misc.py
示例10: test_build_blast_db_from_fasta_path_aln
def test_build_blast_db_from_fasta_path_aln(self):
"""build_blast_db_from_fasta_path works with alignment as input
"""
blast_db, db_files = build_blast_db_from_fasta_path(self.in_aln1_fp)
self.assertEqual(blast_db,self.in_aln1_fp)
expected_db_files = set([blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db,e_value=0.0)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:ElDeveloper,项目名称:brokit,代码行数:22,代码来源:test_formatdb.py
示例11: test_compute_seqs_per_file
def test_compute_seqs_per_file(self):
"""compute_seqs_per_file functions as expected
"""
fd, temp_fasta_fp = mkstemp(prefix='QiimeScriptUtilTests',
suffix='.fasta')
close(fd)
temp_fasta = ['>seq', 'AAACCCCAAATTGG'] * 25
open(temp_fasta_fp, 'w').write('\n'.join(temp_fasta))
actual_25 = self.pw._compute_seqs_per_file(temp_fasta_fp, 25)
actual_2 = self.pw._compute_seqs_per_file(temp_fasta_fp, 2)
actual_10 = self.pw._compute_seqs_per_file(temp_fasta_fp, 10)
actual_5 = self.pw._compute_seqs_per_file(temp_fasta_fp, 5)
actual_40 = self.pw._compute_seqs_per_file(temp_fasta_fp, 40)
remove_files([temp_fasta_fp])
self.assertEqual(actual_25, 1)
self.assertEqual(actual_2, 13)
self.assertEqual(actual_10, 3)
self.assertEqual(actual_5, 5)
self.assertEqual(actual_40, 1)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:22,代码来源:test_util.py
示例12: test_build_blast_db_from_fasta_file
def test_build_blast_db_from_fasta_file(self):
"""build_blast_db_from_fasta_file works with open files as input
"""
blast_db, db_files = \
build_blast_db_from_fasta_file(open(self.in_aln1_fp),output_dir='/tmp/')
self.assertTrue(blast_db.startswith('/tmp/BLAST_temp_db'))
self.assertTrue(blast_db.endswith('.fasta'))
expected_db_files = set([blast_db] + [blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db,e_value=0.0)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:ElDeveloper,项目名称:brokit,代码行数:24,代码来源:test_formatdb.py
示例13: test_build_blast_db_from_fasta_path
def test_build_blast_db_from_fasta_path(self):
"""build_blast_db_from_fasta_path convenience function works as expected
"""
blast_db, db_files = \
build_blast_db_from_fasta_path(self.in_seqs1_fp)
self.assertEqual(blast_db,self.in_seqs1_fp)
expected_db_files = set([self.in_seqs1_fp + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:ElDeveloper,项目名称:brokit,代码行数:24,代码来源:test_formatdb.py
示例14: test_build_blast_db_from_seqs
def test_build_blast_db_from_seqs(self):
"""build_blast_db_from_seqs convenience function works as expected
"""
blast_db, db_files = build_blast_db_from_seqs(self.in_seqs1,output_dir='/tmp')
self.assertTrue(blast_db.startswith('/tmp/Blast_tmp_db'))
self.assertTrue(blast_db.endswith('.fasta'))
expected_db_files = set([blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:ElDeveloper,项目名称:brokit,代码行数:24,代码来源:test_formatdb.py
示例15: test_mothur_supported_version
def test_mothur_supported_version(self):
"""mothur is in path and version is supported """
acceptable_version = (1, 25, 0)
self.assertTrue(which('mothur'),
"mothur not found. This may or may not be a problem depending on " +
"which components of QIIME you plan to use.")
# mothur creates a log file in cwd, so create a tmp and cd there first
log_file = join(get_qiime_temp_dir(), 'mothur.log')
command = "mothur \"#set.logfile(name=%s)\" | grep '^mothur v'" % log_file
stdout, stderr, exit_Status = qiime_system_call(command)
# remove log file
remove_files([log_file], error_on_missing=False)
version_string = stdout.strip().split(' ')[1].strip('v.')
try:
version = tuple(map(int, version_string.split('.')))
pass_test = version == acceptable_version
except ValueError:
pass_test = False
version_string = stdout
self.assertTrue(pass_test,
"Unsupported mothur version. %s is required, but running %s."
% ('.'.join(map(str, acceptable_version)), version_string))
开发者ID:nbresnick,项目名称:qiime,代码行数:24,代码来源:print_qiime_config.py
示例16: tearDown
def tearDown(self):
for dir in self.dirs_to_remove:
if exists(dir):
rmdir(dir)
remove_files(self.files_to_remove)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:5,代码来源:test_sort.py
示例17: tearDown
def tearDown(self):
remove_files(self.files_to_remove)
if self._dirs_to_remove:
for i in self._dirs_to_remove:
rmtree(i)
开发者ID:ajaykshatriya,项目名称:qiime,代码行数:6,代码来源:test_plot_rank_abundance_graph.py
示例18: usearch61_chimera_check
#.........这里部分代码省略.........
sensitivity to very low-divergence chimeras.
usearch61_mindiv: Minimum divergence, i.e. 100% - identity between the
query and closest reference database sequence. Expressed as a percentage,
so the default is 0.8%, which allows chimeras that are up to 99.2% similar
to a reference sequence.
usearch61_abundance_skew: abundance skew for de novo chimera comparisons.
percent_id_usearch61: identity to cluster sequences at
minlen: minimum sequence length for use with usearch61
word_length: length of nucleotide 'words' for usearch61
max_accepts: max number of accepts for hits with usearch61
max_rejects: max number of rejects for usearch61, increasing allows more
sensitivity at a cost of speed
threads: Specify number of threads used per core per CPU
HALT_EXEC=application controller option to halt execution and print command
"""
"""
Need to cluster sequences de novo first to get 1. abundance information
and 2 consensus sequence for each cluster. Using dereplication followed
by clustering does not appear to automatically update complete cluster
size, will directly cluster raw seqs with the small_mem clustering option.
This means without additional parsing steps to recalculate
actual cluster sizes, the sizeorder option can't be used for de novo
clustering and downstream chimera detection."""
files_to_remove = []
# Get absolute paths to avoid issues with calling usearch
input_seqs_fp = abspath(input_seqs_fp)
output_dir = abspath(output_dir)
if reference_seqs_fp:
reference_seqs_fp = abspath(reference_seqs_fp)
log_fp = join(output_dir, "identify_chimeric_seqs.log")
chimeras_fp = join(output_dir, "chimeras.txt")
non_chimeras_fp = join(output_dir, "non_chimeras.txt")
non_chimeras = []
chimeras = []
log_lines = {'denovo_chimeras': 0,
'denovo_non_chimeras': 0,
'ref_chimeras': 0,
'ref_non_chimeras': 0}
if split_by_sampleid:
if verbose:
print "Splitting fasta according to SampleID..."
full_seqs = open(input_seqs_fp, "U")
sep_fastas =\
split_fasta_on_sample_ids_to_files(parse_fasta(full_seqs),
output_dir)
full_seqs.close()
if suppress_usearch61_intermediates:
files_to_remove += sep_fastas
for curr_fasta in sep_fastas:
curr_chimeras, curr_non_chimeras, files_to_remove, log_lines =\
identify_chimeras_usearch61(curr_fasta, output_dir,
reference_seqs_fp, suppress_usearch61_intermediates,
suppress_usearch61_ref, suppress_usearch61_denovo,
non_chimeras_retention, usearch61_minh, usearch61_xn,
usearch61_dn, usearch61_mindiffs, usearch61_mindiv,
usearch61_abundance_skew, percent_id_usearch61, minlen,
word_length, max_accepts, max_rejects, files_to_remove, HALT_EXEC,
log_lines, verbose, threads)
chimeras += curr_chimeras
non_chimeras += curr_non_chimeras
else:
chimeras, non_chimeras, files_to_remove, log_lines =\
identify_chimeras_usearch61(input_seqs_fp, output_dir,
reference_seqs_fp, suppress_usearch61_intermediates,
suppress_usearch61_ref, suppress_usearch61_denovo,
non_chimeras_retention, usearch61_minh, usearch61_xn,
usearch61_dn, usearch61_mindiffs, usearch61_mindiv,
usearch61_abundance_skew, percent_id_usearch61, minlen,
word_length, max_accepts, max_rejects, files_to_remove, HALT_EXEC,
log_lines, verbose, threads)
# write log, non chimeras, chimeras.
write_usearch61_log(log_fp, input_seqs_fp, output_dir,
reference_seqs_fp, suppress_usearch61_intermediates,
suppress_usearch61_ref, suppress_usearch61_denovo,
split_by_sampleid, non_chimeras_retention, usearch61_minh,
usearch61_xn, usearch61_dn, usearch61_mindiffs, usearch61_mindiv,
usearch61_abundance_skew, percent_id_usearch61, minlen,
word_length, max_accepts, max_rejects, HALT_EXEC, log_lines)
chimeras_f = open(chimeras_fp, "w")
non_chimeras_f = open(non_chimeras_fp, "w")
for curr_chimera in chimeras:
chimeras_f.write("%s\n" % curr_chimera)
for curr_non_chimera in non_chimeras:
non_chimeras_f.write("%s\n" % curr_non_chimera)
chimeras_f.close()
non_chimeras_f.close()
remove_files(files_to_remove)
开发者ID:ajaykshatriya,项目名称:qiime,代码行数:101,代码来源:identify_chimeric_seqs.py
示例19: get_chimeras_from_Nast_aligned
def get_chimeras_from_Nast_aligned(seqs_fp, ref_db_aligned_fp=None,
ref_db_fasta_fp=None,
HALT_EXEC=False, min_div_ratio=None,
keep_intermediates=False):
"""remove chimeras from seqs_fp using chimeraSlayer.
seqs_fp: a filepath with the seqs to check in the file
ref_db_aligned_fp: fp to (pynast) aligned reference sequences
ref_db_fasta_fp: same seqs as above, just unaligned. Will be computed on the fly if not provided,
HALT_EXEC: stop execution if true
min_div_ratio: passed to ChimeraSlayer App
"""
files_to_remove = []
# might come in as FilePath object with quotes
seqs_fp = str(seqs_fp)
seqs_fp = seqs_fp.rstrip('"')
seqs_fp = seqs_fp.lstrip('"')
seqs_dir, new_seqs_fp = split(seqs_fp)
# if fp is in current dir, we fake a dir change
if seqs_dir == "":
seqs_dir = "./"
# Chimera Slayer puts some temp files in current dir and some in dir of input file
# use exe_dir to change to dir of input file, so to have all tmp files in
# one place
params = {'--query_NAST': new_seqs_fp,
'--exec_dir': seqs_dir}
if ref_db_aligned_fp is None and ref_db_fasta_fp is None:
# use default db, whose relative position to the
# ChimeraSlayer binary is hardcoded
pass
else:
if not ref_db_fasta_fp:
# make degapped reference file
ref_db_fasta_fp = write_degapped_fasta_to_file(parse_fasta(
open(ref_db_aligned_fp)))
files_to_remove.append(ref_db_fasta_fp)
# use user db
params.update({'--db_NAST': abspath(ref_db_aligned_fp),
'--db_FASTA': abspath(ref_db_fasta_fp)})
if min_div_ratio is not None:
params.update({'-R': min_div_ratio})
app = ChimeraSlayer(params=params, HALT_EXEC=HALT_EXEC)
app_results = app()
# this is a FilePath object in case of success.
# How can we test for failure here?
# if not exists(app_results['CPS']):
# raise ApplicationError, "ChimeraSlayer failed. No output file."
chimeras = parse_CPS_file((app_results['CPS']))
if not keep_intermediates:
app.remove_intermediate_files()
remove_files(files_to_remove)
return chimeras
开发者ID:ajaykshatriya,项目名称:qiime,代码行数:63,代码来源:identify_chimeric_seqs.py
示例20: cleanUp
def cleanUp(self):
""" Remove temporary blast database files, if applicable
"""
remove_files(self._db_files_to_remove, error_on_missing=False)
开发者ID:ajaykshatriya,项目名称:qiime,代码行数:4,代码来源:identify_chimeric_seqs.py
注:本文中的skbio.util.misc.remove_files函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论