本文整理汇总了Python中skbio.util.remove_files函数的典型用法代码示例。如果您正苦于以下问题:Python remove_files函数的具体用法?Python remove_files怎么用?Python remove_files使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了remove_files函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: tearDown
def tearDown(self):
remove_files(set(self.files_to_remove))
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:Springbudder,项目名称:qiime,代码行数:7,代码来源:test_supervised_learning.py
示例2: tearDown
def tearDown(self):
if self._files_to_remove:
remove_files(self._files_to_remove)
if exists(self.output_dir):
rmtree(self.output_dir)
if exists(self.input_dir):
rmtree(self.input_dir)
开发者ID:ElDeveloper,项目名称:qiime,代码行数:7,代码来源:test_add_qiime_labels.py
示例3: test_mothur_supported_version
def test_mothur_supported_version(self):
"""mothur is in path and version is supported """
acceptable_version = (1, 25, 0)
self.assertTrue(
which("mothur"),
"mothur not found. This may or may not be a problem depending on "
+ "which components of QIIME you plan to use.",
)
# mothur creates a log file in cwd, so create a tmp and cd there first
log_file = join(get_qiime_temp_dir(), "mothur.log")
command = "mothur \"#set.logfile(name=%s)\" | grep '^mothur v'" % log_file
stdout, stderr, exit_Status = qiime_system_call(command)
# remove log file
remove_files([log_file], error_on_missing=False)
version_string = stdout.strip().split(" ")[1].strip("v.")
try:
version = tuple(map(int, version_string.split(".")))
pass_test = version == acceptable_version
except ValueError:
pass_test = False
version_string = stdout
self.assertTrue(
pass_test,
"Unsupported mothur version. %s is required, but running %s."
% (".".join(map(str, acceptable_version)), version_string),
)
开发者ID:johnchase,项目名称:qiime,代码行数:28,代码来源:print_qiime_config.py
示例4: tearDown
def tearDown(self):
"""Removes temporary directories and files."""
remove_files(self.files_to_remove)
# Remove directories last, so we don't get errors trying to remove
# files which may be in the directories.
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:Kleptobismol,项目名称:qiime,代码行数:9,代码来源:test_compare_categories.py
示例5: tearDown
def tearDown(self):
""" """
disable_timeout()
remove_files(self.files_to_remove, error_on_missing=False)
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:ElDeveloper,项目名称:qiime,代码行数:9,代码来源:test_map_reads_to_reference.py
示例6: tearDown
def tearDown(self):
"""Clean up tmp files."""
remove_files(self.files_to_remove, False)
if self.tmpdir:
rmtree(self.tmpdir)
# clean up the file from init_flowgram_file
if (hasattr(self, "tmp_filename") and exists(self.tmp_filename)):
remove(self.tmp_filename)
开发者ID:Springbudder,项目名称:qiime,代码行数:9,代码来源:test_utils.py
示例7: tearDown
def tearDown(self):
"""remove all the files after completing tests """
self.mapping_fp.close()
self.fasta_file_no_consensus.close()
self.fasta_file_for_consensus_tie_G_C.close()
self.fasta_file_for_consensus_unequal_length.close()
remove_files([self.mapping_fp_name,
self.fasta_file_no_consensus_name,
self.fasta_file_for_consensus_tie_G_C_name,
self.fasta_file_for_consensus_unequal_length_name,
self.fwd_read_fh_name, self.rev_read_fh_name])
开发者ID:ElDeveloper,项目名称:qiime,代码行数:11,代码来源:test_split_libraries_lea_seq.py
示例8: swarm_denovo_cluster
def swarm_denovo_cluster(seq_path,
d=1,
threads=1,
HALT_EXEC=False):
""" Function : launch the Swarm de novo OTU picker
Parameters: seq_path, filepath to reads
d, resolution
threads, number of threads to use
Return : clusters, list of lists
"""
# Check sequence file exists
if not exists(seq_path):
raise ValueError("%s does not exist" % seq_path)
# Instantiate the object
swarm = Swarm(HALT_EXEC=HALT_EXEC)
# Set the resolution
if d > 0:
swarm.Parameters['-d'].on(d)
else:
raise ValueError("Resolution -d must be a positive integer.")
# Set the number of threads
if threads > 0:
swarm.Parameters['-t'].on(threads)
else:
raise ValueError("Number of threads must be a positive integer.")
# create temporary file for Swarm OTU-map
f, tmp_swarm_otumap = mkstemp(prefix='temp_otumap_',
suffix='.swarm')
close(f)
swarm.Parameters['-o'].on(tmp_swarm_otumap)
# Remove this file later, the final OTU-map
# is output by swarm_breaker.py and returned
# as a list of lists (clusters)
swarm.files_to_remove.append(tmp_swarm_otumap)
# Launch Swarm
# set the data string to include the read filepath
# (to be passed as final arguments in the swarm command)
clusters = swarm(seq_path)
remove_files(swarm.files_to_remove, error_on_missing=False)
# Return clusters
return clusters
开发者ID:biocore,项目名称:burrito-fillings,代码行数:53,代码来源:swarm_v127.py
示例9: tearDown
def tearDown(self):
""" """
disable_timeout()
# reset sys.stderr
sys.stderr = self.saved_stderr
remove_files(self.files_to_remove)
# remove directories last, so we don't get errors
# trying to remove files which may be in the directories
for d in self.dirs_to_remove:
if exists(d):
rmtree(d)
开发者ID:Kleptobismol,项目名称:qiime,代码行数:13,代码来源:test_downstream.py
示例10: remove_intermediate_files
def remove_intermediate_files(self):
"""Remove all intermediate files."""
# tmp files are written in the current dir,
# app controller always jumps into dir specified via exec_dir
# Note: blast intermediates are not removed
exec_dir = str(self.Parameters['--exec_dir'].Value)
inp_file_name = str(self.Parameters['--query_NAST'].Value)
exec_dir = exec_dir.rstrip('"')
exec_dir = exec_dir.lstrip('"')
inp_file_name = inp_file_name.rstrip('"')
inp_file_name = inp_file_name.lstrip('"')
tmp_suffixes = [".CPS", ".CPS.CPC", ".CPS_RENAST", ".CPS_RENAST.cidx",
".CPS.CPC.wTaxons", ".cidx"]
cs_tmp_files = [
exec_dir +
'/' +
inp_file_name +
x for x in tmp_suffixes]
remove_files(cs_tmp_files, error_on_missing=False)
db_param = self.Parameters['--db_NAST']
if db_param.isOn():
nast_db_name = str(db_param.Value)
nast_db_name = nast_db_name.rstrip('"')
nast_db_name = nast_db_name.lstrip('"')
# Better do not remove this file since other ChimeraSlayer
# instances running on the same ref set might use this file
# Should be rather deleted in the calling function
# remove_files([nast_db_name + ".cidx"],
# error_on_missing=False)
fasta_param = self.Parameters['--db_FASTA']
if fasta_param.isOn():
fasta_name = str(fasta_param.Value)
fasta_name = fasta_name.rstrip('"')
fasta_name = fasta_name.lstrip('"')
blast_db_files = [
fasta_name +
x for x in [
".nsq",
".nin",
".nhr",
".cidx"]]
remove_files(blast_db_files, error_on_missing=False)
开发者ID:Springbudder,项目名称:qiime,代码行数:50,代码来源:identify_chimeric_seqs.py
示例11: test_seq_path
def test_seq_path(self):
""" Swarm should raise a ValueError if the sequences
filepath does not exist
"""
f, tmp_file = mkstemp(prefix='temp_reads_',
suffix='.fasta')
close(f)
remove_files([tmp_file])
self.assertRaises(ValueError,
swarm_denovo_cluster,
seq_path=tmp_file,
d=1,
threads=1)
开发者ID:biocore,项目名称:burrito-fillings,代码行数:15,代码来源:test_swarm_v127.py
示例12: tearDown
def tearDown(self):
"""Clean up tmp files."""
# turn off the alarm
signal.alarm(0)
remove_files(self.files_to_remove, False)
if self.server_socket:
self.server_socket.close()
# give clients time to clean up
sleep(1)
if exists(self.tmp_dir):
try:
rmdir(self.tmp_dir)
except OSError:
# give clients some more time, fail if still error
sleep(5)
rmdir(self.tmp_dir)
开发者ID:Kleptobismol,项目名称:qiime,代码行数:18,代码来源:test_cluster_util.py
示例13: test_remove_files
def test_remove_files(self):
# create list of temp file paths
test_fds = [NamedTemporaryFile(delete=False) for i in range(5)]
test_filepaths = [element.name for element in test_fds]
# should work just fine
remove_files(test_filepaths)
# check that an error is raised on trying to remove the files...
self.assertRaises(OSError, remove_files, test_filepaths)
# touch one of the filepaths so it exists
extra_file = NamedTemporaryFile(delete=False).name
test_filepaths.append(extra_file)
# no error is raised on trying to remove the files
# (although 5 don't exist)...
remove_files(test_filepaths, error_on_missing=False)
# ... and the existing file was removed
self.assertFalse(exists(extra_file))
# try to remove them with remove_files and verify that an IOError is
# raises
self.assertRaises(OSError, remove_files, test_filepaths)
# now get no error when error_on_missing=False
remove_files(test_filepaths, error_on_missing=False)
开发者ID:ebolyen,项目名称:scikit-bio,代码行数:27,代码来源:test_misc.py
示例14: test_compute_seqs_per_file
def test_compute_seqs_per_file(self):
"""compute_seqs_per_file functions as expected
"""
fd, temp_fasta_fp = mkstemp(prefix='QiimeScriptUtilTests',
suffix='.fasta')
close(fd)
temp_fasta = ['>seq', 'AAACCCCAAATTGG'] * 25
open(temp_fasta_fp, 'w').write('\n'.join(temp_fasta))
actual_25 = self.pw._compute_seqs_per_file(temp_fasta_fp, 25)
actual_2 = self.pw._compute_seqs_per_file(temp_fasta_fp, 2)
actual_10 = self.pw._compute_seqs_per_file(temp_fasta_fp, 10)
actual_5 = self.pw._compute_seqs_per_file(temp_fasta_fp, 5)
actual_40 = self.pw._compute_seqs_per_file(temp_fasta_fp, 40)
remove_files([temp_fasta_fp])
self.assertEqual(actual_25, 1)
self.assertEqual(actual_2, 13)
self.assertEqual(actual_10, 3)
self.assertEqual(actual_5, 5)
self.assertEqual(actual_40, 1)
开发者ID:ElDeveloper,项目名称:qiime,代码行数:22,代码来源:test_util.py
示例15: test_build_blast_db_from_fasta_path_aln
def test_build_blast_db_from_fasta_path_aln(self):
"""build_blast_db_from_fasta_path works with alignment as input
"""
blast_db, db_files = build_blast_db_from_fasta_path(self.in_aln1_fp)
self.assertEqual(blast_db,self.in_aln1_fp)
expected_db_files = set([blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db,e_value=0.0)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:biocore,项目名称:burrito-fillings,代码行数:22,代码来源:test_formatdb.py
示例16: test_build_blast_db_from_fasta_path
def test_build_blast_db_from_fasta_path(self):
"""build_blast_db_from_fasta_path convenience function works as expected
"""
blast_db, db_files = \
build_blast_db_from_fasta_path(self.in_seqs1_fp)
self.assertEqual(blast_db,self.in_seqs1_fp)
expected_db_files = set([self.in_seqs1_fp + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:biocore,项目名称:burrito-fillings,代码行数:24,代码来源:test_formatdb.py
示例17: select_unique_rand_bcs
def select_unique_rand_bcs(rand_bcs, unique_threshold):
"""
Attempts to select true barcodes from set of barcodes
i.e. removes barcodes that might be artifacts
due to sequencing errors.
Uses uclust to remove barcodes that are similar thatn
threshold.
Parameters
----------
rand_bcs: list
unique_threshold: float
Returns
----------
unique_rand_bcs: set
set of unique random barcodes.
"""
temp_dir = get_qiime_temp_dir()
fasta_fd, fasta_tempfile_name = mkstemp(
dir=temp_dir, prefix='tmp', suffix='.fas')
rand_bcs = set(rand_bcs)
with open(fasta_tempfile_name, 'w') as fasta_tempfile:
for rand_bc in rand_bcs:
fasta_tempfile.write(">{}\n{}\n".format(rand_bc, rand_bc))
fasta_tempfile.close()
_, _, unique_rand_bcs = get_clusters_from_fasta_filepath(
fasta_tempfile_name,
original_fasta_path=None,
percent_ID=unique_threshold,
save_uc_files=False,
output_dir=temp_dir)
unique_rand_bcs = set(unique_rand_bcs)
remove_files([fasta_tempfile_name])
return unique_rand_bcs
开发者ID:Springbudder,项目名称:qiime,代码行数:36,代码来源:split_libraries_lea_seq.py
示例18: test_build_blast_db_from_seqs
def test_build_blast_db_from_seqs(self):
"""build_blast_db_from_seqs convenience function works as expected
"""
blast_db, db_files = build_blast_db_from_seqs(self.in_seqs1,output_dir='/tmp')
self.assertTrue(blast_db.startswith('/tmp/Blast_tmp_db'))
self.assertTrue(blast_db.endswith('.fasta'))
expected_db_files = set([blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:biocore,项目名称:burrito-fillings,代码行数:24,代码来源:test_formatdb.py
示例19: test_build_blast_db_from_fasta_file
def test_build_blast_db_from_fasta_file(self):
"""build_blast_db_from_fasta_file works with open files as input
"""
blast_db, db_files = \
build_blast_db_from_fasta_file(open(self.in_aln1_fp),output_dir='/tmp/')
self.assertTrue(blast_db.startswith('/tmp/BLAST_temp_db'))
self.assertTrue(blast_db.endswith('.fasta'))
expected_db_files = set([blast_db] + [blast_db + ext\
for ext in ['.nhr','.nin','.nsq','.nsd','.nsi','.log']])
self.assertEqual(set(db_files),expected_db_files)
# result returned when blasting against new db
self.assertEqual(\
len(blastn(self.test_seq,blast_db=blast_db,e_value=0.0)),1)
# Make sure all db_files exist
for fp in db_files:
self.assertTrue(exists(fp))
# Remove all db_files exist
remove_files(db_files)
# Make sure nothing weird happened in the remove
for fp in db_files:
self.assertFalse(exists(fp))
开发者ID:biocore,项目名称:burrito-fillings,代码行数:24,代码来源:test_formatdb.py
示例20: get_cluster_ratio
def get_cluster_ratio(fasta_seqs, min_difference_in_clusters):
"""
Uses uclust to calculate cluster ratio
cluster_ratio =
num_of_seq_in_cluster_with_max_seq
divided by
num_of_seq_in cluster_with_second_higest_seq
Parameters
----------
fasta_seqs: list
list of fasta sequences
min_difference_in_clusters: float
percent identity threshold for cluster formation
Returns
----------
cluster_ratio: float
cluster ratio of the sequences using uclust
cluster_ratio =
num_of_seq_in_cluster_with_max_seq /
num_of_seq_in cluster_with_second_higest_seq
"""
cluster_percent_id = min_difference_in_clusters
temp_dir = get_qiime_temp_dir()
fd_uc, uclust_tempfile_name = mkstemp(dir=temp_dir, suffix='.uc')
close(fd_uc)
fd_fas, fasta_tempfile_name = mkstemp(dir=temp_dir, suffix='.uc')
close(fd_fas)
with open(fasta_tempfile_name, 'w') as fasta_tempfile:
fasta_tempfile.write(fasta_seqs)
fasta_tempfile.close()
count = 0
command = "uclust --usersort --input {} --uc {} --id 0.98".format(
fasta_tempfile_name, uclust_tempfile_name)
# In the function, I am calling uclust a large number of times.
# Initially I was using from bfillings.get_clusters_from_fasta_filepath
# but due to issue (biocore/bfillingss#31), I have temporarily
# reverted to qiime_system_call.
count_lookup = defaultdict(int)
qiime_system_call(command)
uclust_tempfile = open(uclust_tempfile_name, 'r')
for line in uclust_tempfile:
if search(r'^C', line):
pieces = line.split('\t')
count_lookup[pieces[1]] += int(pieces[2])
count += 1
uclust_tempfile.close()
files_to_be_removed = list()
files_to_be_removed.append(uclust_tempfile_name)
remove_files(files_to_be_removed)
sorted_counts_in_clusters = sorted(
count_lookup.iteritems(),
key=lambda x: x[1], reverse=True)
try:
max_cluster_count = \
float(str(sorted_counts_in_clusters[0][1]))
second_cluster_count = \
float(str(sorted_counts_in_clusters[1][1]))
return max_cluster_count / second_cluster_count
except IndexError:
return 1
开发者ID:Springbudder,项目名称:qiime,代码行数:63,代码来源:split_libraries_lea_seq.py
注:本文中的skbio.util.remove_files函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论