本文整理汇总了Python中sibispy.sibislogger.info函数的典型用法代码示例。如果您正苦于以下问题:Python info函数的具体用法?Python info怎么用?Python info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_all_gradients
def get_all_gradients(session_label, dti_stack, decimals=None):
"""
Parses a list of dti sidecar files for subject.
Returns
=======
list of np.array
"""
gradients_per_frame = list()
gradients_as_array = np.asanyarray([])
error_xml_path_list=[]
error_msg=""
for xml_path in dti_stack:
xml_sidecar = read_xml_sidecar(xml_path)
try:
gradients_per_frame.append(get_gradient_table(xml_sidecar,
decimals=decimals))
gradients_as_array = np.asanyarray(gradients_per_frame)
except Exception as e:
error_xml_path_list.append(xml_path)
error_msg=str(e)
if error_xml_path_list != [] :
slog.info(session_label,
'ERROR: Could not get gradient table from xml sidecar',
script='xnat/check_gradient_tables.py',
sidecar=str(xml_sidecar),
error_xml_path_list=str(error_xml_path_list),
error_msg=error_msg)
return gradients_as_array
开发者ID:weiweichusri,项目名称:ncanda-data-integration,代码行数:32,代码来源:check_gradient_tables.py
示例2: schedule_cluster_job
def schedule_cluster_job(self,job_script, job_title,submit_log=None, job_log=None, verbose=False):
qsub_cmd= '/opt/sge/bin/lx-amd64/qsub'
if not os.path.exists(qsub_cmd):
slog.info(job_title + "-" +hashlib.sha1(str(job_script).encode('utf-8')).hexdigest()[0:6],"ERROR: Failed to schedule job as '" + qsub_cmd + "' cannot be found!", job_script = str(job_script))
return False
sge_env = os.environ.copy()
sge_env['SGE_ROOT'] = '/opt/sge'
sge_param = self.__sibis_defs['cluster_parameters'].split(',')
if job_log :
sge_param += ['-o', job_log]
else :
sge_param += ['-o','/dev/null']
qsub_args= [ qsub_cmd ] + sge_param + ['-N', '%s' % (job_title) ]
#stderr=subprocess.STDOUT
qsub_process = subprocess.Popen( qsub_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr= subprocess.PIPE, env=sge_env)
(stdoutdata, stderrdata) = qsub_process.communicate(str(job_script).encode('utf-8'))
cmd_str='echo "%s" | %s\n' % (job_script," ".join(qsub_args))
if stderrdata :
slog.info(job_title + "-" + hashlib.sha1(str(stderrdata).encode('utf-8')).hexdigest()[0:6],"ERROR: Failed to schedule job !", cmd = cmd_str, err_msg = str(stderrdata))
return False
if verbose:
print(cmd_str)
if stdoutdata:
print(stdoutdata.decode('utf-8'))
if submit_log:
with open(submit_log, "a") as myfile:
myfile.write(cmd_str)
myfile.write(stdoutdata.decode('utf-8'))
return True
开发者ID:sibis-platform,项目名称:sibis,代码行数:35,代码来源:redcap_to_casesdir.py
示例3: connect_server
def connect_server(self,api_type, timeFlag=False):
"""
Connect to servers, setting each property.
"""
if api_type not in self.api :
slog.info('session.connect_server','api type ' + api_type + ' not defined !',
api_types = str(list(self.api.keys())))
return None
if timeFlag :
slog.startTimer2()
if api_type == 'xnat' :
connectionPtr = self.__connect_xnat__()
elif api_type == 'xnat_http' :
connectionPtr = self.__connect_xnat_http__()
elif api_type == 'browser_penncnp' :
connectionPtr = self.__connect_penncnp__()
elif api_type == 'svn_laptop' :
connectionPtr = self.__connect_svn_laptop__()
elif api_type == 'redcap_mysql_db' :
connectionPtr = self.__connect_redcap_mysql__()
else :
connectionPtr = self.__connect_redcap_project__(api_type)
if timeFlag :
slog.takeTimer2('connect_' + api_type)
return connectionPtr
开发者ID:sibis-platform,项目名称:sibis,代码行数:29,代码来源:session.py
示例4: mail_user
def mail_user( self, uid, msglist ):
# Get user full name and email address
try:
user_firstname = self._interface.manage.users.firstname( uid )
user_lastname = self._interface.manage.users.lastname( uid )
user_email = self._interface.manage.users.email( uid )
except:
slog.info('xnat_email',"ERROR: failed to get detail information for user" + str(uid))
return
problem_list = [ '<ol>' ]
for m in msglist:
problem_list.append( '<li>%s</li>' % m )
problem_list.append( '</ol>' )
# Create the body of the message (a plain-text and an HTML version).
html = '<html>\n\
<head></head>\n\
<body>\n\
<p>Dear %s %s:<br><br>\n\
We have detected the following problem(s) with data you uploaded to the <a href="%s">%s XNAT image repository</a>:</br>\n\
%s\n\
Please address these issues as soon as possible (direct links to the respective data items are provided above for your convenience).\n\
You may want to consult the <a href="http://www.nitrc.org/docman/view.php/672/1206/N-CANDA%%20MRI%%20and%%20Image%%20Management%%20Manual">NCANDA MRI and Image Management Manual</a> for instructions.<br></br>\n\
If you have further questions, feel free to contact the <a href="mailto:%s">NCANDA support</a>\n\
</p>\n\
</body>\n\
</html>' % (user_firstname, user_lastname, self._site_url, self._site_name, '\n'.join( problem_list ), self._xnat_admin_email)
self.send( "NCANDA XNAT: problems with your uploaded data", self._xnat_admin_email, [ user_email ], html )
开发者ID:sbenito,项目名称:ncanda-data-integration,代码行数:30,代码来源:xnat_email.py
示例5: redcap_import_record_to_api
def redcap_import_record_to_api(self, records, api_type, error_label, time_label = None):
if len(records) == 0 :
return None
if api_type == None :
api_type = self.__active_redcap_project__
if api_type in self.api :
red_api = self.api[api_type]
else :
return None
if not red_api:
return None
if time_label:
slog.startTimer2()
try:
import_response = red_api.import_records(records, overwrite='overwrite')
except requests.exceptions.RequestException as e:
error = 'session:redcap_import_record:Failed to import into REDCap'
err_list = ast.literal_eval(str(e))['error'].split('","')
error_label += '-' + hashlib.sha1(str(e).encode('utf-8')).hexdigest()[0:6]
slog.info(error_label, error,
requestError=str(e),
red_api = api_type)
return None
if time_label:
slog.takeTimer2("redcap_import_" + time_label, str(import_response))
return import_response
开发者ID:sibis-platform,项目名称:sibis,代码行数:34,代码来源:session.py
示例6: check_for_stroop
def check_for_stroop( xnat, xnat_eid_list, verbose=False ):
stroop_files = []
if verbose :
print "check_for_stroop: " + str(xnat_eid_list)
for xnat_eid in xnat_eid_list:
experiment = xnat.select.experiment( xnat_eid )
# Get list of resource files that match the Stroop file name pattern
for resource in experiment.resources().get():
resource_files = xnat._get_json( '/data/experiments/%s/resources/%s/files?format=json' % ( xnat_eid, resource ) );
stroop_files += [ (xnat_eid, resource, re.sub( '.*\/files\/', '', file['URI']) ) for file in resource_files if re.match( '^NCANDAStroopMtS_3cycles_7m53stask_.*.txt$', file['Name'] ) ]
# No matching files - nothing to do
if len( stroop_files ) == 0:
if verbose :
print "check_for_stroop: no stroop"
return (None, None, None)
# Get first file from list, warn if more files
if len( stroop_files ) > 1:
error = "ERROR: experiment have/has more than one Stroop .txt file. Please make sure there is exactly one per session."
for xnat_eid in xnat_eid_list:
slog.info(xnat_eid,error)
return (None, None, None)
if verbose :
print "check_for_stroop: Stroop File: " + str(stroop_files[0])
return stroop_files[0]
开发者ID:weiweichusri,项目名称:ncanda-data-integration,代码行数:29,代码来源:import_mr_sessions_stroop.py
示例7: redcap_export_records_from_api
def redcap_export_records_from_api(self, time_label, api_type, **selectStmt):
if api_type == None :
red_api = self.__get_active_redcap_api__()
else :
if api_type in self.api :
red_api = self.api[api_type]
else :
return None
if not red_api:
return None
if time_label:
slog.startTimer2()
try:
with warnings.catch_warnings(record=True) as w:
redcap_data = red_api.export_records(**selectStmt)
if len(w):
w_str = str(w[-1])
if "Specify dtype option on import or set low_memory=False" not in w_str :
slog.info("session.redcap_export_records","Waring: exporting data from REDCap caused warning at {}".format(time.asctime()),
warning_msg = w_msg,
**selectStmt)
except Exception as err_msg:
slog.info("session.redcap_export_records","ERROR: exporting data from REDCap failed at {}".format(time.asctime()),
err_msg = str(err_msg),
**selectStmt)
return None
if time_label:
slog.takeTimer2("redcap_export_" + time_label)
return redcap_data
开发者ID:sibis-platform,项目名称:sibis,代码行数:34,代码来源:session.py
示例8: xnat_export_general
def xnat_export_general(self,form, fields, conditions, time_label = None):
xnat_api = self.__get_xnat_api__()
if not xnat_api:
return None
if time_label:
slog.startTimer2()
try:
# python if one cannot connect to server then
with Capturing() as xnat_output:
xnat_data = list(xnat_api.search(form, fields).where(conditions).items())
except Exception as err_msg:
if xnat_output :
slog.info("session.xnat_export_general","ERROR: querying XNAT failed most likely due disconnect to server ({})".format(time.asctime()),
xnat_api_output = str(xnat_output),
form = str(form),
fields = str(fields),
conditions = str(conditions),
err_msg = str(err_msg))
else :
slog.info("session.xnat_export_general","ERROR: querying XNAT failed at {}".format(time.asctime()),
form = str(form),
fields = str(fields),
conditions = str(conditions),
err_msg = str(err_msg))
return None
if time_label:
slog.takeTimer2("xnat_export_" + time_label)
return xnat_data
开发者ID:sibis-platform,项目名称:sibis,代码行数:32,代码来源:session.py
示例9: __check_all_forms__
def __check_all_forms__(self):
# Filter each form
text_list = list()
non_redcap_list = list()
for export_name in list(self.__export_forms.keys()):
(form_text_list, form_non_redcap_list) = self.__check_form__(export_name)
if form_text_list :
text_list += form_text_list
if form_non_redcap_list:
non_redcap_list += form_non_redcap_list
if text_list:
slog.info('redcap_to_casesdir.__check_all_forms__.' + hashlib.sha1(str(text_list).encode()).hexdigest()[0:6], "ERROR: The txt file(s) in '" + str(self.__forms_dir) + "' list non-numeric redcap variable names!",
form_variable_list = str(text_list),
info = "Remove it from form file or modify definition in REDCap")
if non_redcap_list :
slog.info('redcap_to_casesdir.__check_all_forms__.' + hashlib.sha1(str(text_list).encode()).hexdigest()[0:6], "ERROR: The txt file(s) in '" + str(self.__forms_dir) + "' list variables that do not exist in redcap!",
form_variable_list = str(non_redcap_list),
info = "Remove it from form or modify definition REDCap")
if non_redcap_list or text_list:
return False
return True
开发者ID:sibis-platform,项目名称:sibis,代码行数:25,代码来源:redcap_to_casesdir.py
示例10: __get_analysis_dir
def __get_analysis_dir(self) :
analysis_dir = self.__config_usr_data.get_value('analysis_dir')
if analysis_dir == None :
slog.info("session.__get_analysis_dir-" + hashlib.sha1(str(self.__config_usr_data.get_config_file()).encode('utf-8')).hexdigest()[0:6],"ERROR: 'analysis_dir' is not defined in config file !",
config_file = self.__config_usr_data.get_config_file())
return analysis_dir
开发者ID:sibis-platform,项目名称:sibis,代码行数:7,代码来源:session.py
示例11: check_xml_file
def check_xml_file( xml_file, project, session, label ):
xml = open( xml_file, 'r' )
warnings = []
try:
for line in xml:
# Check fallbacks triggered
if 'fallbackOrientationCNR' in line:
warnings.append( "CNR spheres used for orientation - problem detecting 15mm spheres?" )
if 'fallbackCentroidCNR' in line:
match = re.match( '^.*distance="([0-9]+\.[0-9]+)".*$', line )
distance = float( match.group(1) )
if distance > 3.0:
warnings.append( "CNR spheres used for centroid location (distance to SNR center = %f mm) - problem with the SNR sphere?" % distance )
# Check number of landmarks
match = re.match( '<landmarkList.*count="([0-9]+)">', line )
if match:
count = int( match.group(1) )
if ( count < 165 ):
warnings.append( "Landmark count=%d" % (project,session,count) )
# Check SNR
match = re.match( '<snr>([0-9]*\.[0-9]*)</snr>', line )
if match:
snr = float( match.group(1) )
if ( snr < 50 ):
warnings.append( "Low SNR=%f" % (project,session,snr) )
# Check scale
match = re.match( '<scale>([0-9]*\.[0-9]*)\s+([0-9]*\.[0-9]*)\s+([0-9]*\.[0-9]*)</scale>', line )
if match:
for idx in [0,1,2]:
scale = float( match.group( idx+1 ) )
if ( (scale < 0.99) or (scale > 1.01) ):
warnings.append( "Non-unit scale[%d]=%f" % (project,session,idx,scale) )
# Check nonlinearity
match = re.match( '<nonlinear>([0-9]*\.[0-9]*)\s+([0-9]*\.[0-9]*)\s+([0-9]*\.[0-9]*)</nonlinear>', line )
if match:
for idx in [0,1,2]:
nonlinear = float( match.group( idx+1 ) )
if ( (nonlinear > 0.5) ):
warnings.append( "Nonlinearity[%d]=%f" % (project,session,idx,nonlinear) )
except:
error='Could not open XML file for experiment.'
slog.info(session,error,
project_id=project)
finally:
xml.close()
# Print warnings if there were any
if len( warnings ) > 0:
warning = " ".join(warnings)
slog.info(label, warning,
session_id=session,
project=project,
script='t1_qa_functions')
开发者ID:sbenito,项目名称:ncanda-data-integration,代码行数:60,代码来源:t1_qa_functions.py
示例12: process_phantom_session
def process_phantom_session( interface, project, subject, session, label, force_updates=False ):
# Get the experiment object
experiment = interface.select.experiment( session )
# First, see if the QA files are already there
files = experiment.resources().files().get()
if force_updates or not (('t1.nii.gz' in files) and ('phantom.xml' in files) and ('phantom.nii.gz' in files)):
dicom_path=''
# Get list of all scans in the session
scans = experiment.scans().get()
for scan in scans:
# Check only 'usable' scans with the proper name
[scan_type,quality] = experiment.scan( scan ).attrs.mget( ['type', 'quality'] )
if ('mprage' in scan_type) or ('t1spgr' in scan_type):
# Extract the DICOM file directory from the XML representation
match = re.match( '.*(/fs/storage/XNAT/.*)scan_.*_catalog.xml.*', experiment.scan( scan ).get(), re.DOTALL )
if match:
dicom_path = match.group(1)
if dicom_path:
# If we found a matching scan, run the QA
run_phantom_qa( interface, project, subject, session, label, dicom_path )
else:
# If there was no matching scan in the session, print a warning
warning = "WARNING: ADNI phantom session: {}, experiment: {}, subject: {} does not have \
a usable T1-weighted scan".format(session, experiment, subject)
slog.info(hashlib.sha1('t1_qa_functions').hexdigest()[0:6], warning,
script='t1_qa_functions')
开发者ID:sbenito,项目名称:ncanda-data-integration,代码行数:28,代码来源:t1_qa_functions.py
示例13: xnat_get_classes
def xnat_get_classes(self):
xnat_api = self.__get_xnat_api__()
if not xnat_api:
error_msg = "XNAT API is not defined! Cannot retrieve classes!",
slog.info(eid,error_msg,
function = "session.xnat_get_classes")
return None
return xnat_api.client.classes
开发者ID:sibis-platform,项目名称:sibis,代码行数:8,代码来源:session.py
示例14: svn_client
def svn_client(self):
svn_laptop = self.api['svn_laptop']
if not svn_laptop:
slog.info('session.svn_client',"ERROR: svn api is not defined")
return None
client = svn_laptop['client']
return client
开发者ID:sibis-platform,项目名称:sibis,代码行数:8,代码来源:session.py
示例15: translate_subject_and_event
def translate_subject_and_event( self, subject_code, event_label):
if event_label in list(self.__event_dict.keys()):
(arm_code,visit_code) = self.__event_dict[event_label]
else:
slog.info(str(subject_code),"ERROR: Cannot determine study Arm and Visit from event %s" % event_label )
pipeline_workdir_rel = os.path.join( subject_code, arm_code, visit_code )
return (arm_code,visit_code,pipeline_workdir_rel)
开发者ID:sibis-platform,项目名称:sibis,代码行数:8,代码来源:redcap_to_casesdir.py
示例16: create_datadict
def create_datadict(self, export_name, datadict_dir):
if export_name not in self.__export_forms.keys() :
slog.info('redcap_to_casesdir.create_datadict',"ERROR: could not create data dictionary for form " + export_name)
return None
export_form_entry_list = self.__export_forms[export_name]
size_entry_list = len(export_form_entry_list)
export_form_list = [export_name] * size_entry_list
return self.__create_datadicts_general__(datadict_dir, export_name, export_form_list,export_form_entry_list)
开发者ID:sibis-platform,项目名称:sibis,代码行数:9,代码来源:redcap_to_casesdir.py
示例17: delete_workdir
def delete_workdir(workdir,redcap_visit_id,verbose=False):
if os.path.exists(workdir):
if verbose :
print "Deleting " + workdir
try :
shutil.rmtree(workdir)
except Exception as err_msg:
slog.info(redcap_visit_id,"Error: Could not delete directory " + workdir,
err_msg = str(err_msg))
开发者ID:weiweichusri,项目名称:ncanda-data-integration,代码行数:10,代码来源:export_mr_sessions_pipeline.py
示例18: __load_ground_truth_gradients
def __load_ground_truth_gradients(self,gt_dti_path):
dti_stack = glob.glob(gt_dti_path)
if not len(dti_stack):
slog.info("__load_ground_truth_gradients","Error: Cannot find " + gt_dti_path)
return []
dti_stack.sort()
# Parse the xml files to get scanner specific gradients per frame
(gradients, errorFlag) = self.__get_all_gradients(gt_dti_path, "", "",dti_stack)
return np.array(gradients)
开发者ID:sibis-platform,项目名称:sibis,代码行数:10,代码来源:check_dti_gradients.py
示例19: run_phantom_qa
def run_phantom_qa( interface, project, subject, session, label, dicom_path ):
# Make a temporary directory
temp_dir = tempfile.mkdtemp()
# Switch to temp directory
original_wd = os.getcwd()
os.chdir( temp_dir )
# Create NIFTI file from the DICOM files
nii_file = 't1.nii.gz'
subprocess.call( 'cmtk dcm2image --tolerance 1e-3 -rO %s %s >& /dev/null' % ( nii_file, dicom_path ), shell=True )
if not os.path.exists( nii_file ):
error = "ERROR: NIFTI file was not created from DICOM files experiment"
slog.info('{}/{}'.format(project,session),error,
session = session,
project = project,
nii_file = nii_file,
dicom_path = dicom_path)
return
# Upload NIFTI file
try:
file = interface.select.project( project ).subject( subject ).experiment( session ).resource('QA').file( nii_file )
file.insert( nii_file, format='nifti_gz', tags='qa,adni,nifti_gz', content='ADNI Phantom QA File', overwrite=True )
except:
print "Something bad happened uploading file %s to Experiment %s/%s/%s" % (nii_file,project,session,label)
# Run the PERL QA script and capture its output
xml_file = 'phantom.xml'
lbl_file = 'phantom.nii.gz'
subprocess.call( 'cmtk detect_adni_phantom --tolerant --refine-xform --erode-snr 15 --write-labels %s %s %s' % ( lbl_file, nii_file, xml_file ), shell=True )
if not os.path.exists( xml_file ) or not os.path.exists( lbl_file ):
error = "ERROR: mandatory output file (either xml or label image) was not created from file %s, experiment %s/%s/%s" % ( nii_file,project,session,label )
slog.info('{}/{}/{}'.format(project,session,label),error,
nii_file=nii_file,
project = project,
session = session,
label= label)
return
# Upload phantom files to XNAT
for (fname,fmt) in [ (xml_file, 'xml'), (lbl_file, 'nifti_gz') ]:
try:
file = interface.select.project( project ).subject( subject ).experiment( session ).resource('QA').file( fname )
file.insert( fname, format=fmt, tags='qa,adni,%s' % fmt, content='ADNI Phantom QA File', overwrite=True )
except:
print "Something bad happened uploading file %s to Experiment %s/%s" % (fname,project,session)
# Read and evaluate phantom XML file
check_xml_file( xml_file, project, session, label )
# Clean up - remove temp directory
os.chdir( original_wd )
shutil.rmtree( temp_dir )
开发者ID:sbenito,项目名称:ncanda-data-integration,代码行数:54,代码来源:t1_qa_functions.py
示例20: __get_active_redcap_api__
def __get_active_redcap_api__(self):
project = self.__active_redcap_project__
if not project :
slog.info('__get_active_redcap_api__','Error: an active redcap project is currently not defined ! Most likely redcap api was not initialized correctly')
return None
if not self.api[project]:
slog.info('__get_active_redcap_api__','Error: ' + str(project) + ' api not defined')
return None
return self.api[project]
开发者ID:sibis-platform,项目名称:sibis,代码行数:11,代码来源:session.py
注:本文中的sibispy.sibislogger.info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论