本文整理汇总了Python中qiita_db.util.get_count函数的典型用法代码示例。如果您正苦于以下问题:Python get_count函数的具体用法?Python get_count怎么用?Python get_count使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_count函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_load_processed_data_from_cmd
def test_load_processed_data_from_cmd(self):
filepaths = [self.otu_table_fp, self.otu_table_2_fp]
filepath_types = ['biom', 'biom']
initial_processed_data_count = get_count('qiita.processed_data')
initial_processed_fp_count = get_count('qiita.processed_filepath')
initial_fp_count = get_count('qiita.filepath')
new = load_processed_data_cmd(filepaths, filepath_types,
'processed_params_uclust', 1, 1, None)
processed_data_id = new.id
self.files_to_remove.append(
join(self.db_test_processed_data_dir,
'%d_%s' % (processed_data_id, basename(self.otu_table_fp))))
self.files_to_remove.append(
join(self.db_test_processed_data_dir,
'%d_%s' % (processed_data_id,
basename(self.otu_table_2_fp))))
self.assertTrue(check_count('qiita.processed_data',
initial_processed_data_count + 1))
self.assertTrue(check_count('qiita.processed_filepath',
initial_processed_fp_count + 2))
self.assertTrue(check_count('qiita.filepath',
initial_fp_count + 2))
# Ensure that the ValueError is raised when a filepath_type is not
# provided for each and every filepath
with self.assertRaises(ValueError):
load_processed_data_cmd(filepaths, filepath_types[:-1],
'processed_params_uclust', 1, 1, None)
开发者ID:Jorge-C,项目名称:qiita,代码行数:31,代码来源:test_commands.py
示例2: _common_purge_filpeaths_test
def _common_purge_filpeaths_test(self):
# Get all the filepaths so we can test if they've been removed or not
sql_fp = "SELECT filepath, data_directory_id FROM qiita.filepath"
fps = [join(get_mountpoint_path_by_id(dd_id), fp) for fp, dd_id in self.conn_handler.execute_fetchall(sql_fp)]
# Make sure that the files exist - specially for travis
for fp in fps:
if not exists(fp):
with open(fp, "w") as f:
f.write("\n")
self.files_to_remove.append(fp)
_, raw_data_mp = get_mountpoint("raw_data")[0]
removed_fps = [join(raw_data_mp, "2_sequences_barcodes.fastq.gz"), join(raw_data_mp, "2_sequences.fastq.gz")]
for fp in removed_fps:
with open(fp, "w") as f:
f.write("\n")
sql = """INSERT INTO qiita.filepath
(filepath, filepath_type_id, checksum,
checksum_algorithm_id, data_directory_id)
VALUES ('2_sequences_barcodes.fastq.gz', 3, '852952723', 1, 5),
('2_sequences.fastq.gz', 1, '852952723', 1, 5)
RETURNING filepath_id"""
fp_ids = self.conn_handler.execute_fetchall(sql)
fps = set(fps).difference(removed_fps)
# Check that the files exist
for fp in fps:
self.assertTrue(exists(fp))
for fp in removed_fps:
self.assertTrue(exists(fp))
exp_count = get_count("qiita.filepath") - 2
purge_filepaths()
obs_count = get_count("qiita.filepath")
# Check that only 2 rows have been removed
self.assertEqual(obs_count, exp_count)
# Check that the 2 rows that have been removed are the correct ones
sql = """SELECT EXISTS(
SELECT * FROM qiita.filepath WHERE filepath_id = %s)"""
obs = self.conn_handler.execute_fetchone(sql, (fp_ids[0][0],))[0]
self.assertFalse(obs)
obs = self.conn_handler.execute_fetchone(sql, (fp_ids[1][0],))[0]
self.assertFalse(obs)
# Check that the files have been successfully removed
for fp in removed_fps:
self.assertFalse(exists(fp))
# Check that all the other files still exist
for fp in fps:
self.assertTrue(exists(fp))
开发者ID:DarcyMyers,项目名称:qiita,代码行数:60,代码来源:test_util.py
示例3: test_delete
def test_delete(self):
# successful delete
total_analyses = get_count("qiita.analysis")
Analysis.delete(1)
self.assertEqual(total_analyses - 1, get_count("qiita.analysis"))
# no possible to delete
with self.assertRaises(QiitaDBUnknownIDError):
Analysis.delete(total_analyses + 1)
开发者ID:MarkBruns,项目名称:qiita,代码行数:9,代码来源:test_analysis.py
示例4: test_exists
def test_exists(self):
qiita_config.portal = 'QIITA'
self.assertTrue(Analysis.exists(1))
new_id = get_count("qiita.analysis") + 1
self.assertFalse(Analysis.exists(new_id))
qiita_config.portal = 'EMP'
self.assertFalse(Analysis.exists(1))
new_id = get_count("qiita.analysis") + 1
self.assertFalse(Analysis.exists(new_id))
开发者ID:MarkBruns,项目名称:qiita,代码行数:9,代码来源:test_analysis.py
示例5: test_artifact_post_req
def test_artifact_post_req(self):
# Create new prep template to attach artifact to
pt = npt.assert_warns(
QiitaDBWarning, PrepTemplate.create,
pd.DataFrame({'new_col': {'1.SKD6.640190': 1}}), Study(1), '16S')
self._files_to_remove.extend([fp for _, fp in pt.get_filepaths()])
new_artifact_id = get_count('qiita.artifact') + 1
filepaths = {'raw_forward_seqs': 'uploaded_file.txt',
'raw_barcodes': 'update.txt'}
obs = artifact_post_req(
'[email protected]', filepaths, 'FASTQ', 'New Test Artifact', pt.id)
exp = {'status': 'success',
'message': ''}
self.assertEqual(obs, exp)
obs = r_client.get('prep_template_%d' % pt.id)
self.assertIsNotNone(obs)
redis_info = loads(r_client.get(loads(obs)['job_id']))
while redis_info['status_msg'] == 'Running':
sleep(0.05)
redis_info = loads(r_client.get(loads(obs)['job_id']))
# Instantiate the artifact to make sure it was made and
# to clean the environment
a = Artifact(new_artifact_id)
self._files_to_remove.extend([fp for _, fp, _ in a.filepaths])
# Test importing an artifact
# Create new prep template to attach artifact to
pt = npt.assert_warns(
QiitaDBWarning, PrepTemplate.create,
pd.DataFrame({'new_col': {'1.SKD6.640190': 1}}), Study(1), '16S')
self._files_to_remove.extend([fp for _, fp in pt.get_filepaths()])
new_artifact_id_2 = get_count('qiita.artifact') + 1
obs = artifact_post_req(
'[email protected]', {}, 'FASTQ', 'New Test Artifact 2', pt.id,
new_artifact_id)
exp = {'status': 'success',
'message': ''}
self.assertEqual(obs, exp)
obs = r_client.get('prep_template_%d' % pt.id)
self.assertIsNotNone(obs)
redis_info = loads(r_client.get(loads(obs)['job_id']))
while redis_info['status_msg'] == 'Running':
sleep(0.05)
redis_info = loads(r_client.get(loads(obs)['job_id']))
# Instantiate the artifact to make sure it was made and
# to clean the environment
a = Artifact(new_artifact_id_2)
self._files_to_remove.extend([fp for _, fp, _ in a.filepaths])
开发者ID:carlyboyd,项目名称:qiita,代码行数:53,代码来源:test_artifact.py
示例6: _common_purge_filpeaths_test
def _common_purge_filpeaths_test(self):
# Get all the filepaths so we can test if they've been removed or not
sql_fp = "SELECT filepath, data_directory_id FROM qiita.filepath"
fps = [join(get_mountpoint_path_by_id(dd_id), fp) for fp, dd_id in
self.conn_handler.execute_fetchall(sql_fp)]
# Make sure that the files exist - specially for travis
for fp in fps:
if not exists(fp):
with open(fp, 'w') as f:
f.write('\n')
self.files_to_remove.append(fp)
_, raw_data_mp = get_mountpoint('raw_data')[0]
removed_fps = [
join(raw_data_mp, '2_sequences_barcodes.fastq.gz'),
join(raw_data_mp, '2_sequences.fastq.gz')]
fps = set(fps).difference(removed_fps)
# Check that the files exist
for fp in fps:
self.assertTrue(exists(fp))
for fp in removed_fps:
self.assertTrue(exists(fp))
exp_count = get_count("qiita.filepath") - 2
purge_filepaths(self.conn_handler)
obs_count = get_count("qiita.filepath")
# Check that only 2 rows have been removed
self.assertEqual(obs_count, exp_count)
# Check that the 2 rows that have been removed are the correct ones
sql = """SELECT EXISTS(
SELECT * FROM qiita.filepath WHERE filepath_id = %s)"""
obs = self.conn_handler.execute_fetchone(sql, (3,))[0]
self.assertFalse(obs)
obs = self.conn_handler.execute_fetchone(sql, (4,))[0]
self.assertFalse(obs)
# Check that the files have been successfully removed
for fp in removed_fps:
self.assertFalse(exists(fp))
# Check that all the other files still exist
for fp in fps:
self.assertTrue(exists(fp))
开发者ID:jwdebelius,项目名称:qiita,代码行数:51,代码来源:test_util.py
示例7: test_create_templates_from_qiime_mapping_file_reverse_linker
def test_create_templates_from_qiime_mapping_file_reverse_linker(self):
new_pt_id = get_count('qiita.prep_template') + 1
obs_st, obs_pt = create_templates_from_qiime_mapping_file(
StringIO(QIIME_MAP_WITH_REVERSE_LINKER_PRIMER),
self.new_study, "16S")
# Be green: clean the environment
for template in [obs_st, obs_pt]:
for _, fp in template.get_filepaths():
self._clean_up_files.append(fp)
self.assertEqual(obs_st.id, self.new_study.id)
self.assertEqual(obs_pt.id, new_pt_id)
# Check that each template has the correct columns
exp = {"physical_specimen_location", "physical_specimen_remaining",
"dna_extracted", "sample_type", "host_subject_id", "latitude",
"longitude", "taxon_id", "scientific_name",
"collection_timestamp", "description"}
self.assertEqual(set(obs_st.categories()), exp)
exp = {"barcode", "primer", "center_name", "run_prefix", "platform",
"library_construction_protocol",
"experiment_design_description", "reverselinkerprimer"}
self.assertEqual(set(obs_pt.categories()), exp)
开发者ID:MarkBruns,项目名称:qiita,代码行数:25,代码来源:test_metadata_pipeline.py
示例8: test_create
def test_create(self):
"""Correctly creates the rows in the DB for the reference"""
fp_count = get_count('qiita.filepath')
# Check that the returned object has the correct id
obs = Reference.create(self.name, self.version, self.seqs_fp,
self.tax_fp, self.tree_fp)
self.assertEqual(obs.id, 2)
seqs_id = fp_count + 1
tax_id = fp_count + 2
tree_id = fp_count + 3
# Check that the information on the database is correct
obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.reference WHERE reference_id=2")
exp = [[2, self.name, self.version, seqs_id, tax_id, tree_id]]
self.assertEqual(obs, exp)
# Check that the filepaths have been correctly added to the DB
obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.filepath WHERE filepath_id=%s or "
"filepath_id=%s or filepath_id=%s", (seqs_id, tax_id, tree_id))
exp_seq = "%s_%s_%s" % (self.name, self.version,
basename(self.seqs_fp))
exp_tax = "%s_%s_%s" % (self.name, self.version,
basename(self.tax_fp))
exp_tree = "%s_%s_%s" % (self.name, self.version,
basename(self.tree_fp))
exp = [[seqs_id, exp_seq, 10, '0', 1, 6],
[tax_id, exp_tax, 11, '0', 1, 6],
[tree_id, exp_tree, 12, '0', 1, 6]]
self.assertEqual(obs, exp)
开发者ID:aashish24,项目名称:qiita,代码行数:32,代码来源:test_reference.py
示例9: test_build_biom_tables
def test_build_biom_tables(self):
new_id = get_count('qiita.filepath') + 1
samples = {1: ['1.SKB8.640193', '1.SKD8.640184', '1.SKB7.640196']}
self.analysis._build_biom_tables(samples, 100)
obs = self.analysis.biom_tables
self.assertEqual(obs, {'18S': self.biom_fp})
table = load_table(self.biom_fp)
obs = set(table.ids(axis='sample'))
exp = {'1.SKB8.640193', '1.SKD8.640184', '1.SKB7.640196'}
self.assertEqual(obs, exp)
obs = table.metadata('1.SKB8.640193')
exp = {'Study':
'Identification of the Microbiomes for Cannabis Soils',
'Processed_id': 1}
self.assertEqual(obs, exp)
sql = """SELECT EXISTS(SELECT * FROM qiita.filepath
WHERE filepath_id=%s)"""
obs = self.conn_handler.execute_fetchone(sql, (new_id,))[0]
self.assertTrue(obs)
sql = """SELECT * FROM qiita.analysis_filepath
WHERE analysis_id=%s ORDER BY filepath_id"""
obs = self.conn_handler.execute_fetchall(sql, (self.analysis.id,))
exp = [[1L, 14L, 2L], [1L, 15L, None], [1L, new_id, None]]
开发者ID:MarkBruns,项目名称:qiita,代码行数:29,代码来源:test_analysis.py
示例10: test_new_person_created
def test_new_person_created(self):
person_count_before = get_count('qiita.study_person')
post_data = {'new_people_names': ['Adam', 'Ethan'],
'new_people_emails': ['[email protected]', '[email protected]'],
'new_people_affiliations': ['CU Boulder', 'NYU'],
'new_people_addresses': ['Some St., Boulder, CO 80305',
''],
'new_people_phones': ['', ''],
'study_title': 'dummy title',
'study_alias': 'dummy alias',
'pubmed_id': 'dummy pmid',
'investigation_type': 'eukaryote',
'environmental_packages': 'air',
'is_timeseries': 'y',
'study_abstract': "dummy abstract",
'study_description': 'dummy description',
'principal_investigator': '-2',
'lab_person': '1'}
self.post('/study/create/', post_data)
# Check that the new person was created
expected_id = person_count_before + 1
self.assertTrue(check_count('qiita.study_person', expected_id))
new_person = StudyPerson(expected_id)
self.assertTrue(new_person.name == 'Ethan')
self.assertTrue(new_person.email == '[email protected]')
self.assertTrue(new_person.affiliation == 'NYU')
self.assertTrue(new_person.address is None)
self.assertTrue(new_person.phone is None)
开发者ID:Jorge-C,项目名称:qiita,代码行数:32,代码来源:test_study_handlers.py
示例11: test_build_mapping_file
def test_build_mapping_file(self):
new_id = get_count('qiita.filepath') + 1
samples = {1: ['1.SKB8.640193', '1.SKD8.640184', '1.SKB7.640196']}
self.analysis._build_mapping_file(samples)
obs = self.analysis.mapping_file
self.assertEqual(obs, self.map_fp)
base_dir = get_mountpoint('analysis')[0][1]
obs = pd.read_csv(obs, sep='\t', infer_datetime_format=True,
parse_dates=True, index_col=False, comment='\t')
exp = pd.read_csv(join(base_dir, '1_analysis_mapping_exp.txt'),
sep='\t', infer_datetime_format=True,
parse_dates=True, index_col=False, comment='\t')
assert_frame_equal(obs, exp)
sql = """SELECT * FROM qiita.filepath
WHERE filepath=%s ORDER BY filepath_id"""
obs = self.conn_handler.execute_fetchall(
sql, ("%d_analysis_mapping.txt" % self.analysis.id,))
exp = [[13, '1_analysis_mapping.txt', 9, '852952723', 1, 1],
[new_id, '1_analysis_mapping.txt', 9, '1606265094', 1, 1]]
self.assertEqual(obs, exp)
sql = """SELECT * FROM qiita.analysis_filepath
WHERE analysis_id=%s ORDER BY filepath_id"""
obs = self.conn_handler.execute_fetchall(sql, (self.analysis.id,))
exp = [[1L, 14L, 2L], [1L, 15L, None], [1L, new_id, None]]
开发者ID:MarkBruns,项目名称:qiita,代码行数:29,代码来源:test_analysis.py
示例12: test_post_edit
def test_post_edit(self):
study_count_before = get_count('qiita.study')
study = Study(1)
study_info = study.info
post_data = {
'new_people_names': [],
'new_people_emails': [],
'new_people_affiliations': [],
'new_people_addresses': [],
'new_people_phones': [],
'study_title': 'dummy title',
'study_alias': study_info['study_alias'],
'publications_doi': ','.join(
[doi for doi, _ in study.publications]),
'study_abstract': study_info['study_abstract'],
'study_description': study_info['study_description'],
'principal_investigator': study_info['principal_investigator_id'],
'lab_person': study_info['lab_person_id']}
self.post('/study/edit/1', post_data)
# Check that the study was updated
self.assertTrue(check_count('qiita.study', study_count_before))
self.assertEqual(study.title, 'dummy title')
开发者ID:mivamo1214,项目名称:qiita,代码行数:25,代码来源:test_study_handlers.py
示例13: test_import_preprocessed_data
def test_import_preprocessed_data(self):
initial_ppd_count = get_count('qiita.preprocessed_data')
initial_fp_count = get_count('qiita.filepath')
ppd = load_preprocessed_data_from_cmd(
1, 'preprocessed_sequence_illumina_params',
self.tmpdir, 'preprocessed_sequences', 1, False, 1)
self.files_to_remove.append(
join(self.db_test_ppd_dir,
'%d_%s' % (ppd.id, basename(self.file1))))
self.files_to_remove.append(
join(self.db_test_ppd_dir,
'%d_%s' % (ppd.id, basename(self.file2))))
self.assertEqual(ppd.id, 3)
self.assertTrue(check_count('qiita.preprocessed_data',
initial_ppd_count + 1))
self.assertTrue(check_count('qiita.filepath', initial_fp_count+2))
开发者ID:Jorge-C,项目名称:qiita,代码行数:16,代码来源:test_commands.py
示例14: test_load_data_from_cmd
def test_load_data_from_cmd(self):
filepaths = [self.forward_fp, self.reverse_fp, self.barcodes_fp]
filepath_types = ['raw_forward_seqs', 'raw_reverse_seqs',
'raw_barcodes']
filetype = 'FASTQ'
metadata_dict = {
'SKB8.640193': {'center_name': 'ANL',
'primer': 'GTGCCAGCMGCCGCGGTAA',
'barcode': 'GTCCGCAAGTTA',
'run_prefix': "s_G1_L001_sequences",
'platform': 'ILLUMINA',
'instrument_model': 'Illumina MiSeq',
'library_construction_protocol': 'AAAA',
'experiment_design_description': 'BBBB'}}
metadata = pd.DataFrame.from_dict(metadata_dict, orient='index')
pt1 = PrepTemplate.create(metadata, Study(1), "16S")
prep_templates = [pt1.id]
initial_raw_count = get_count('qiita.raw_data')
initial_fp_count = get_count('qiita.filepath')
initial_raw_fp_count = get_count('qiita.raw_filepath')
new = load_raw_data_cmd(filepaths, filepath_types, filetype,
prep_templates)
raw_data_id = new.id
self.files_to_remove.append(
join(self.db_test_raw_dir,
'%d_%s' % (raw_data_id, basename(self.forward_fp))))
self.files_to_remove.append(
join(self.db_test_raw_dir,
'%d_%s' % (raw_data_id, basename(self.reverse_fp))))
self.files_to_remove.append(
join(self.db_test_raw_dir,
'%d_%s' % (raw_data_id, basename(self.barcodes_fp))))
self.assertTrue(check_count('qiita.raw_data', initial_raw_count + 1))
self.assertTrue(check_count('qiita.filepath',
initial_fp_count + 3))
self.assertTrue(check_count('qiita.raw_filepath',
initial_raw_fp_count + 3))
# Ensure that the ValueError is raised when a filepath_type is not
# provided for each and every filepath
with self.assertRaises(ValueError):
load_raw_data_cmd(filepaths, filepath_types[:-1], filetype,
prep_templates)
开发者ID:jenwei,项目名称:qiita,代码行数:47,代码来源:test_commands.py
示例15: test_set_step
def test_set_step(self):
new_id = get_count("qiita.analysis") + 1
new = Analysis.create(User("[email protected]"), "newAnalysis",
"A New Analysis", Analysis(1))
new.step = 2
sql = "SELECT * FROM qiita.analysis_workflow WHERE analysis_id = %s"
obs = self.conn_handler.execute_fetchall(sql, [new_id])
self.assertEqual(obs, [[new_id, 2]])
开发者ID:RNAer,项目名称:qiita,代码行数:8,代码来源:test_analysis.py
示例16: test_prep_template_delete_req
def test_prep_template_delete_req(self):
template = pd.read_csv(self.update_fp, sep='\t', index_col=0)
new_id = get_count('qiita.prep_template') + 1
npt.assert_warns(QiitaDBWarning, PrepTemplate.create,
template, Study(1), '16S')
obs = prep_template_delete_req(new_id, '[email protected]')
exp = {'status': 'success',
'message': ''}
self.assertEqual(obs, exp)
开发者ID:yimsea,项目名称:qiita,代码行数:9,代码来源:test_prep_template.py
示例17: test_post
def test_post(self):
new_prep_id = get_count('qiita.prep_template') + 1
arguments = {'study_id': '1',
'data-type': '16S',
'prep-file': 'new_template.txt'}
response = self.post('/prep_template/', arguments)
self.assertEqual(response.code, 200)
# Check that the new prep template has been created
self.assertTrue(PrepTemplate.exists(new_prep_id))
开发者ID:antgonza,项目名称:qiita,代码行数:9,代码来源:test_prep_template.py
示例18: _get_stats
def _get_stats(self, callback):
# check if the key exists in redis
redis_lats_key = '%s:stats:sample_lats' % qiita_config.portal
redis_longs_key = '%s:stats:sample_longs' % qiita_config.portal
lats = r_client.lrange(redis_lats_key, 0, -1)
longs = r_client.lrange(redis_longs_key, 0, -1)
if not (lats and longs):
# if we don't have them, then fetch from disk and add to the
# redis server with a 24-hour expiration
lat_longs = get_lat_longs()
lats = [float(x[0]) for x in lat_longs]
longs = [float(x[1]) for x in lat_longs]
with r_client.pipeline() as pipe:
for latitude, longitude in lat_longs:
# storing as a simple data structure, hopefully this
# doesn't burn us later
pipe.rpush(redis_lats_key, latitude)
pipe.rpush(redis_longs_key, longitude)
# set the key to expire in 24 hours, so that we limit the
# number of times we have to go to the database to a reasonable
# amount
r_client.expire(redis_lats_key, 86400)
r_client.expire(redis_longs_key, 86400)
pipe.execute()
else:
# If we do have them, put the redis results into the same structure
# that would come back from the database
longs = [float(x) for x in longs]
lats = [float(x) for x in lats]
lat_longs = zip(lats, longs)
# Get the number of studies
num_studies = get_count('qiita.study')
# Get the number of samples
num_samples = len(lats)
# Get the number of users
num_users = get_count('qiita.qiita_user')
callback([num_studies, num_samples, num_users, lat_longs])
开发者ID:anupriyatripathi,项目名称:qiita,代码行数:43,代码来源:stats.py
示例19: test_add_results
def test_add_results(self):
fp_count = get_count('qiita.filepath')
self.job.add_results([(join(self._job_folder, "1_job_result.txt"),
"plain_text")])
# make sure files attached to job properly
obs = self.conn_handler.execute_fetchall(
"SELECT * FROM qiita.job_results_filepath WHERE job_id = 1")
self.assertEqual(obs, [[1, 10], [1, fp_count + 1]])
开发者ID:adamrp,项目名称:qiita,代码行数:10,代码来源:test_job.py
示例20: test_update_preprocessed_data_from_cmd
def test_update_preprocessed_data_from_cmd(self):
exp_ppd = PreprocessedData(Study(1).preprocessed_data()[0])
exp_fps = exp_ppd.get_filepaths()
# The original paths mush exist, but they're not included in the test
# so create them here
for _, fp, _ in exp_fps:
with open(fp, 'w') as f:
f.write("")
next_fp_id = get_count('qiita.filepath') + 1
exp_fps.append(
(next_fp_id,
join(self.db_ppd_dir, "%s_split_library_log.txt" % exp_ppd.id),
'log'))
ppd = update_preprocessed_data_from_cmd(self.test_slo, 1)
# Check that the modified preprocessed data is the correct one
self.assertEqual(ppd.id, exp_ppd.id)
# Check that the filepaths returned are correct
# We need to sort the list returned from the db because the ordering
# on that list is based on db modification time, rather than id
obs_fps = sorted(ppd.get_filepaths())
self.assertEqual(obs_fps, sorted(exp_fps))
# Check that the checksums have been updated
sql = "SELECT checksum FROM qiita.filepath WHERE filepath_id=%s"
# Checksum of the fasta file
obs_checksum = self.conn_handler.execute_fetchone(
sql, (obs_fps[0][0],))[0]
self.assertEqual(obs_checksum, '3532748626')
# Checksum of the fastq file
obs_checksum = self.conn_handler.execute_fetchone(
sql, (obs_fps[1][0],))[0]
self.assertEqual(obs_checksum, '2958832064')
# Checksum of the demux file
# The checksum is generated dynamically, so the checksum changes
# We are going to test that the checksum is not the one that was
# before, which corresponds to an empty file
obs_checksum = self.conn_handler.execute_fetchone(
sql, (obs_fps[2][0],))[0]
self.assertTrue(isinstance(obs_checksum, str))
self.assertNotEqual(obs_checksum, '852952723')
self.assertTrue(len(obs_checksum) > 0)
# Checksum of the log file
obs_checksum = self.conn_handler.execute_fetchone(
sql, (obs_fps[3][0],))[0]
self.assertEqual(obs_checksum, '626839734')
开发者ID:MarkBruns,项目名称:qiita,代码行数:54,代码来源:test_commands.py
注:本文中的qiita_db.util.get_count函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论