def summary():
## not used anymore IMO
RDI = remainingDatasetInfo()
si = siteInfo()
remainings={}
for site in RDI.sites():
load = RDI.get(site)
if si.disk[site] : continue
print site,si.disk[site],"[TB] free",si.quota[site],"[TB] quota"
if not load: continue
tags = ['pilup','input','output','lock','unlock','tape','stuck-tape','missing-tape']
for tag in tags:
v = sum([ info['size'] for ds,info in load.items() if tag in info['reasons']]) / 1024.
print "\t %10f [TB] remaining because of %s"%(v,tag)
def collector(url, specific, options):
up = componentInfo(mcm=False, soft=['mcm'])
if not up.check(): return
SI = siteInfo()
dss = DSS()
#NL = newLockInfo()
mcm = McMClient(dev=False)
fetch_in_campaigns = ['RunIISummer15GS']
mcm_statuses=['new']#,'validation','defined','approved']
will_be_used = defaultdict(list)
secondary_used = defaultdict(list)
for campaign,status in itertools.product( fetch_in_campaigns, mcm_statuses):
queries=[]
if campaign:
print "getting for",campaign
queries.append('member_of_campaign=%s'%campaign)
if status:
print "getting for",status
queries.append('status=%s'%status)
rs = mcm.getA('requests', query='&'.join(queries))
for r in rs:
#if r['type'] != 'Prod': continue
dataset = r['input_dataset']
if dataset:
#print r['prepid'],dataset
will_be_used[dataset].append( r )
pileup = r['pileup_dataset_name']
if pileup:
secondary_used['pileup'].append( r )
all_transfers = defaultdict(list)
print len(will_be_used),"datasets that can be pre-fetched"
## for secondary we really need to have the campaign right
print len(secondary_used),"pileup will be used"
datasets = will_be_used.keys()
if options.limit:
print "Restricting to randomly picked",options.limit
random.shuffle( datasets )
datasets = datasets[:options.limit]
for dataset in datasets:
print "\tlooking at",dataset
#presence = getDatasetPresence(url, dataset)#, within_sites=['T2_CH_CERN'])
## lock all those, and pre-fecth them
#NL.lock( dataset )
## we could get the reqmgr dictionnary from McM if it was implemented and use standard workflowInfo !!!
for request in will_be_used[dataset]:
print "will be used by",request['prepid']
campaign = request['member_of_campaign']
## based on the campaign, pre-fetch a site list
sites_allowed = SI.sites_T1s + SI.sites_with_goodIO
if options.spread:
## pick up the number of copies from campaign
copies_needed = 1 ## hard coded for now
else:
copies_needed = 1 ## hard coded for now
print "Will look for",copies_needed,"of",dataset
## figure out where it is and going
destinations, all_block_names = getDatasetDestinations(url, dataset, within_sites = [SI.CE_to_SE(site) for site in sites_allowed])
print json.dumps( destinations, indent=2)
prim_location = [site for (site,info) in destinations.items() if info['completion']==100 and info['data_fraction']==1]
prim_destination = [site for site in destinations.keys() if not site in prim_location]
prim_destination = [site for site in prim_destination if not any([osite.startswith(site) for osite in SI.sites_veto_transfer])]
copies_needed = max(0,copies_needed - len(prim_location))
copies_being_made = [ sum([info['blocks'].keys().count(block) for site,info in destinations.items() if site in prim_destination]) for block in all_block_names]
prim_to_distribute = [site for site in sites_allowed if not SI.CE_to_SE(site) in prim_location]
prim_to_distribute = [site for site in prim_to_distribute if not SI.CE_to_SE(site) in prim_destination]
## take out the ones that cannot receive transfers
prim_to_distribute = [site for site in prim_to_distribute if not any([osite.startswith(site) for osite in SI.sites_veto_transfer])]
copies_needed = max(0,copies_needed - min(copies_being_made))
spreading = {}
if copies_needed:
print "needing",copies_needed
chops,sizes = getDatasetChops(dataset, chop_threshold = options.chopsize)
spreading = distributeToSites( chops, prim_to_distribute, n_copies = copies_needed, weights=SI.cpu_pledges, sizes=sizes)
else:
print "no copy needed for",dataset
for (site,items) in spreading.items():
all_transfers[site].extend( items )
print "accumulated transfers"
print json.dumps(all_transfers, indent=2)
if not options.test:
sendEmail('dataset to be fetched',
'the following datasets and location were figured from mcm up-coming requests\n%s'%( json.dumps(all_transfers, indent=2) ),
destination=['[email protected]'])
## now collect and make transfer request
for (site,items_to_transfer) in all_transfers.iteritems():
print "Directing at",site
items_to_transfer = list(set(items_to_transfer))
site_se = SI.CE_to_SE(site)
blocks = [it for it in items_to_transfer if '#' in it]
datasets = [it for it in items_to_transfer if not '#' in it]
#.........这里部分代码省略.........
def main():
url = 'cmsweb.cern.ch'
url_tb = 'cmsweb-testbed.cern.ch'
# Example: python assign.py -w amaltaro_RVZTT_120404_163607_6269
# -t testbed-relval -s T1_US_FNAL -e CMSSW_6_0_0_pre1_FS_TEST_WMA -p v1 -a
# relval -l /store/backfill/1
usage = "usage: %prog [options] [WORKFLOW]"
parser = optparse.OptionParser(usage=usage)
parser.add_option('-t', '--team', help='Type of Requests', dest='team')
parser.add_option('-s', '--sites', help=' "t1" for Tier-1\'s and "t2" for Tier-2\'s', dest='sites')
parser.add_option('--special', help='Use it for special workflows. You also have to change the code according to the type of WF', dest='special')
parser.add_option('-r', '--replica', action='store_true', dest='replica', default=False, help='Adds a _Disk Non-Custodial Replica parameter')
parser.add_option('-p', '--procversion', help='Processing Version, if empty it will leave the processing version that comes by default in the request', dest='procversion')
parser.add_option('-a', '--activity', help='Dashboard Activity (reprocessing, production or test), if empty will set reprocessing as default', dest='activity')
parser.add_option('-x', '--xrootd', help='Assign with trustSiteLocation=True (allows xrootd capabilities)',
action='store_true', default=False, dest='xrootd')
parser.add_option('-l', '--lfn', help='Merged LFN base', dest='lfn')
parser.add_option('-v', '--verbose', help='Verbose', action='store_true', default=False, dest='verbose')
parser.add_option('--testbed', help='Assign in testbed', action='store_true', default=False, dest='testbed')
parser.add_option('--test', action="store_true",help='Nothing is injected, only print infomation about workflow and Era', dest='test')
parser.add_option('-f', '--file', help='Text file with a list of wokflows. If this option is used, the same settings will be applied to all workflows', dest='file')
parser.add_option('-w', '--workflow', help='Workflow Name', dest='workflow')
parser.add_option('-e', '--era', help='Acquistion era', dest='era')
parser.add_option("--procstr", dest="procstring", help="Overrides Processing String with a single string")
(options, args) = parser.parse_args()
if options.testbed:
url = url_tb
# parse input workflows and files. If both -w and -f options are used, then only the -w inputs are considered.
if not options.workflow:
if args:
wfs = args
elif options.file:
wfs = [l.strip() for l in open(options.file) if l.strip()]
else:
parser.error("Input a workflow name or a file to read them")
sys.exit(0)
else:
wfs = [options.workflow]
#Default values
era = {}
procversion = 1
procstring = {}
replica = False
sites = ALL_SITES
specialStr = ''
taskchain = False
team = 'production'
trust_site = False
SI = siteInfo()
# Handling the parameters given in the command line
# parse site list
if options.sites:
if options.sites == "t1":
sites = SI.sites_T1s
elif options.sites == "t2":
sites = SI.sites_T2s
else:
sites = [site for site in options.sites.split(',')]
else:
sites = SI.sites_T1s + SI.sites_T2s
if options.team:
team = options.team
if options.xrootd:
trust_site = True
if options.replica:
replica = True
for wf in wfs:
# Getting the original dictionary
schema = getRequestDict(url, wf)
wf = reqMgr.Workflow(wf, url=url)
# WF must be in assignment-approved in order to be assigned
if (schema["RequestStatus"] != "assignment-approved"):
print("The workflow '" + wf.name + "' you are trying to assign is not in assignment-approved")
sys.exit(1)
#Check to see if the workflow is a task chain or an ACDC of a taskchain
taskchain = (schema["RequestType"] == "TaskChain") or ((schema["RequestType"] == "Resubmission") and "task" in schema["InitialTaskPath"].split("/")[1])
#Dealing with era and proc string
if taskchain:
# Setting the Era and ProcStr values per Task
for key, value in schema.items():
if type(value) is dict and key.startswith("Task"):
try:
if 'ProcessingString' in value:
procstring[value['TaskName']] = value['ProcessingString']
else:
procstring[value['TaskName']] = schema['ProcessingString']
if 'AcquisitionEra' in value:
#.........这里部分代码省略.........
# check if there is a custodial
# check = findCustodialLocation(url, dataset)
check = getDatasetPresence(url, dataset,complete=None)
if len(check):
print "OK for dataset at",check
else:
print "need to pick a site and transfer"
get_those.append( dataset )
print get_those
#res= makeReplicaRequest(url, get_those_to, get_those, "restaging because of Redigi Move custodial screw up")
print res
"""
sys.exit(1)
SI = siteInfo()
#items = getDatasetChops(dataset)
items = [['block'] for i in range(100)]
siteblacklist = ['T2_TH_CUNSTDA','T1_TW_ASGC','T2_TW_Taiwan']
sites = [s for s in json.loads(open('/afs/cern.ch/user/c/cmst2/www/mc/whitelist.json').read()) if s not in siteblacklist]
random.shuffle(sites)
sites = sites[:10]
#weights = { }
#for (i,site) in enumerate(sites):
#weights[site]= random.random()
# weights[site] = i
SI.cpu_pledges
spreading = distributeToSites( items, sites, n_per_site = 2 , weights=SI.cpu_pledges)
def spawn_harvesting(url, wfi , in_full):
SI = siteInfo()
all_OK = {}
requests = []
outputs = wfi.request['OutputDatasets']
if ('EnableHarvesting' in wfi.request and wfi.request['EnableHarvesting']) or ('DQMConfigCacheID' in wfi.request and wfi.request['DQMConfigCacheID']):
if not 'MergedLFNBase' in wfi.request:
print "fucked up"
sendEmail('screwed up wl cache','%s wl cache is bad'%(wfi.request['RequestName']))
all_OK['fake'] = False
return all_OK,requests
wfi = workflowInfo(url, wfi.request['RequestName'])
dqms = [out for out in outputs if '/DQM' in out]
if not all([in_full[dqm_input] for dqm_input in dqms]):
wfi.sendLog('closor',"will not be able to assign the harvesting: holding up")
for dqm_input in dqms:
all_OK[dqm_input] = False
return all_OK,requests
for dqm_input in dqms:
## handle it properly
harvesting_schema = {
'Requestor': os.getenv('USER'),
'RequestType' : 'DQMHarvest',
'Group' : 'DATAOPS'
}
copy_over = ['ProcessingString',
'DQMUploadUrl',
'CMSSWVersion',
'CouchDBName',
'CouchWorkloadDBName',
'CouchURL',
'DbsUrl',
'inputMode',
'DQMConfigCacheID',
'OpenRunningTimeout',
'ScramArch',
'CMSSWVersion',
'Campaign',
'Memory', #dummy
'SizePerEvent', #dummy
'GlobalTag', #dummy
]
for item in copy_over:
harvesting_schema[item] = copy.deepcopy(wfi.request[item])
harvesting_schema['InputDataset'] = dqm_input
harvesting_schema['TimePerEvent'] = 1
harvesting_schema['PrepID'] = 'Harvest-'+wfi.request['PrepID']
harvesting_schema['RequestString'] = 'HARVEST-'+wfi.request['RequestString']
harvesting_schema['DQMHarvestUnit'] = 'byRun'
harvesting_schema['ConfigCacheUrl'] = harvesting_schema['CouchURL'] ## uhm, how stupid is that ?
harvesting_schema['RequestPriority'] = wfi.request['RequestPriority']*10
harvest_request = reqMgrClient.submitWorkflow(url, harvesting_schema)
if not harvest_request:
print "Error in making harvesting for",wfo.name
print "schema"
print json.dumps( harvesting_schema, indent = 2)
harvest_request = reqMgrClient.submitWorkflow(url, harvesting_schema)
if not harvest_request:
print "Error twice in harvesting for",wfo.name
print "schema"
print json.dumps( harvesting_schema, indent = 2)
if harvest_request:
requests.append( harvest_request )
## should we protect for setting approved ? no, it's notified below, assignment will fail, likely
data = reqMgrClient.setWorkflowApproved(url, harvest_request)
print "created",harvest_request,"for harvesting of",dqm_input
wfi.sendLog('closor',"created %s for harvesting of %s"%( harvest_request, dqm_input))
## assign it directly
team = wfi.request['Teams'][0]
parameters={
'SiteWhitelist' : [SI.SE_to_CE(se) for se in wfi.request['NonCustodialSites']],
'AcquisitionEra' : wfi.acquisitionEra(),
'ProcessingString' : wfi.processingString(),
'MergedLFNBase' : wfi.request['MergedLFNBase'],
'ProcessingVersion' : wfi.request['ProcessingVersion'],
'execute' : True
}
if in_full[dqm_input]:
print "using full copy at",in_full[dqm_input]
parameters['SiteWhitelist'] = [SI.SE_to_CE(se) for se in in_full[dqm_input]]
else:
print "cannot do anything if not having a full copy somewhere"
all_OK[dqm_input]=False
continue
result = reqMgrClient.assignWorkflow(url, harvest_request, team, parameters)
if not result:
sendEmail('harvesting request created','%s was created at announcement of %s in %s, failed to assign'%(harvest_request, dqm_input, wfi.request['RequestName']), destination=[wfi.request['Requestor']+'@cern.ch'])
else:
sendEmail('harvesting request assigned','%s was created at announcement of %s in %s, and assigned'%(harvest_request, dqm_input, wfi.request['RequestName']), destination=[wfi.request['Requestor']+'@cern.ch'])
wfi.sendLog('closor','%s was created at announcement of %s in %s, and assigned'%(harvest_request, dqm_input, wfi.request['RequestName']))
else:
print "could not make the harvesting for",wfo.name,"not announcing"
wfi.sendLog('closor',"could not make the harvesting request")
#.........这里部分代码省略.........
def recoveror(url,specific,options=None):
if userLock('recoveror'): return
up = componentInfo(mcm=False, soft=['mcm'])
if not up.check(): return
CI = campaignInfo()
SI = siteInfo()
UC = unifiedConfiguration()
def make_int_keys( d ):
for code in d:
d[int(code)] = d.pop(code)
error_codes_to_recover = UC.get('error_codes_to_recover')
error_codes_to_block = UC.get('error_codes_to_block')
error_codes_to_notify = UC.get('error_codes_to_notify')
make_int_keys( error_codes_to_recover )
make_int_keys( error_codes_to_block )
make_int_keys( error_codes_to_notify )
#wfs = session.query(Workflow).filter(Workflow.status == 'assistance-recovery').all()
wfs = session.query(Workflow).filter(Workflow.status.contains('recovery')).all()
if specific:
wfs.extend( session.query(Workflow).filter(Workflow.status == 'assistance-manual').all() )
for wfo in wfs:
if specific and not specific in wfo.name:continue
if not specific and 'manual' in wfo.status: continue
wfi = workflowInfo(url, wfo.name)
## need a way to verify that this is the first round of ACDC, since the second round will have to be on the ACDC themselves
all_errors = {}
try:
wfi.getSummary()
all_errors = wfi.summary['errors']
except:
pass
print '-'*100
print "Looking at",wfo.name,"for recovery options"
recover = True
if not 'MergedLFNBase' in wfi.request:
print "fucked up"
sendEmail('missing lfn','%s wl cache is screwed up'%wfo.name)
recover = False
if not len(all_errors):
print "\tno error for",wfo.name
recover = False
task_to_recover = defaultdict(list)
message_to_ops = ""
message_to_user = ""
if 'LheInputFilese' in wfi.request and wfi.request['LheInputFiles']:
## we do not try to recover pLHE
recover = False
if wfi.request['RequestType'] in ['MonteCarlo','ReReco']:
recover = False
if 'Campaign' in wfi.request:
c = wfi.request['Campaign']
if c in CI.campaigns and 'recover' in CI.campaigns[c]:
recover=CI.campaigns[c]['recover']
for task,errors in all_errors.items():
print "\tTask",task
## collect all error codes and #jobs regardless of step at which it occured
all_codes = []
for name, codes in errors.items():
if type(codes)==int: continue
all_codes.extend( [(int(code),info['jobs'],name,list(set([e['type'] for e in info['errors']])),list(set([e['details'] for e in info['errors']])) ) for code,info in codes.items()] )
all_codes.sort(key=lambda i:i[1], reverse=True)
sum_failed = sum([l[1] for l in all_codes])
for errorCode,njobs,name,types,details in all_codes:
rate = 100*njobs/float(sum_failed)
#print ("\t\t %10d (%6s%%) failures with error code %10d (%"+str(max_legend)+"s) at stage %s")%(njobs, "%4.2f"%rate, errorCode, legend, name)
print ("\t\t %10d (%6s%%) failures with error code %10d (%30s) at stage %s")%(njobs, "%4.2f"%rate, errorCode, ','.join(types), name)
added_in_recover=False
#if options.go:
# force the recovery of any task with error ?
if errorCode in error_codes_to_recover:
## the error code is registered
for case in error_codes_to_recover[errorCode]:
match = case['details']
matched= (match==None)
if not matched:
matched=False
#.........这里部分代码省略.........
#.........这里部分代码省略.........
html_doc.write("<li>%s : last %s, avg %s</li>\n"%( m, display_time(lasttime), display_time(avg)))
html_doc.write("</ul>")
html_doc.write("Last running <pre>%s</pre><br>"%( os.popen("tac %s/logs/running | head -5"%monitor_dir).read() ))
html_doc.write("Order in cycle <pre>%s</pre><br>"%( '\n'.join(map(lambda l : l.split('/')[-1].replace('.py',''), filter(lambda l : not l.startswith('#') and 'Unified' in l and 'py' in l.split('/')[-1], open('%s/WmAgentScripts/cycle.sh'%base_dir).read().split('\n')))) ))
html_doc.write("</div>\n")
lap ( 'done with jobs' )
text=""
count=0
for (c,info) in campaignInfo().campaigns.items():
#if 'go' in info and info['go']:
text+="<li>%s <br> <pre>%s</pre> </li>"%( c, json.dumps( info, indent=2))
count+=1
html_doc.write("""Campaign configuration
<a href="javascript:showhide('campaign')">[Click to show/hide]</a>
<br>
<div id="campaign" style="display:none;">
<br>
<ul>
%s
</ul></div>
"""%(text))
text=""
count=0
n_column = 4
SI = siteInfo()
date1 = time.strftime('%Y-%m-%d+%H:%M', time.gmtime(time.mktime(time.gmtime())-(15*24*60*60)) ) ## 15 days
date2 = time.strftime('%Y-%m-%d+%H:%M', time.gmtime())
for t in SI.types():
text+="<li>%s<table border=1>"%t
c=0
for site in getattr(SI,t):
cpu = SI.cpu_pledges[site] if site in SI.cpu_pledges else 'N/A'
disk = SI.disk[SI.CE_to_SE(site)] if SI.CE_to_SE(site) in SI.disk else 'N/A'
if c==0:
text+="<tr>"
if not disk:
ht_disk = '<font color=red>Disk available: %s</font>'%disk
else:
ht_disk = 'Disk available: %s'%disk
text+='<td><a href=http://dashb-ssb.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site=%s>%s</a><br><a href="http://cms-gwmsmon.cern.ch/prodview/%s" target="_blank"><img src="http://cms-gwmsmon.cern.ch/prodview/graphs/%s/daily" style="height:50px"></a><br><a href="http://dashb-cms-job.cern.ch/dashboard/templates/web-job2/#user=&refresh=0&table=Jobs&p=1&records=25&activemenu=1&site=%s&submissiontool=wmagent&check=submitted&sortby=activity&scale=linear&bars=20&data1=%s&date2=%s">dashb</a><br>CPU pledge: %s<br>%s</td>'%(site,site,site,site,site,date1,date2,cpu,ht_disk)
if c==n_column:
c=0
else:
c+=1
text+="</table></li>"
text += "<li> Sites in auto-approved transfer<ul>"
for site in sorted(SI.sites_auto_approve):
text+="<li>%s"% site
text += "</ul></li>"
text += "<li> Sites with vetoe transfer<ul>"
for site in sorted(SI.sites_veto_transfer):
text+="<li>%s"% site
text += "</ul></li>"
def assignor(url ,specific = None, talk=True, options=None):
if userLock(): return
if duplicateLock(): return
#if notRunningBefore( 'stagor' ): return
if not componentInfo().check(): return
CI = campaignInfo()
SI = siteInfo()
LI = lockInfo()
NLI = newLockInfo()
n_assigned = 0
n_stalled = 0
wfos=[]
if specific:
wfos = session.query(Workflow).filter(Workflow.name==specific).all()
if not wfos:
if specific:
wfos = session.query(Workflow).filter(Workflow.status=='considered').all()
wfos.extend( session.query(Workflow).filter(Workflow.status=='staging').all())
wfos.extend(session.query(Workflow).filter(Workflow.status=='staged').all())
for wfo in wfos:
if specific:
if not any(map(lambda sp: sp in wfo.name,specific.split(','))): continue
#if not specific in wfo.name: continue
print "\n\n",wfo.name,"\n\tto be assigned"
wfh = workflowInfo( url, wfo.name)
## check if by configuration we gave it a GO
if not CI.go( wfh.request['Campaign'] ) and not options.go:
print "No go for",wfh.request['Campaign']
n_stalled+=1
continue
## check on current status for by-passed assignment
if wfh.request['RequestStatus'] !='assignment-approved':
if not options.test:
print wfo.name,wfh.request['RequestStatus'],"setting away and skipping"
## the module picking up from away will do what is necessary of it
wfo.wm_status = wfh.request['RequestStatus']
wfo.status = 'away'
session.commit()
continue
else:
print wfo.name,wfh.request['RequestStatus']
## retrieve from the schema, dbs and reqMgr what should be the next version
version=wfh.getNextVersion()
if not version:
if options and options.ProcessingVersion:
version = options.ProcessingVersion
else:
print "cannot decide on version number"
n_stalled+=1
continue
(lheinput,primary,parent,secondary) = wfh.getIO()
sites_allowed = getSiteWhiteList( (lheinput,primary,parent,secondary) )
print "Site white list",sorted(sites_allowed)
if 'SiteWhitelist' in CI.parameters(wfh.request['Campaign']):
sites_allowed = CI.parameters(wfh.request['Campaign'])['SiteWhitelist']
if 'SiteBlacklist' in CI.parameters(wfh.request['Campaign']):
print "Reducing the whitelist due to black list in campaign configuration"
print "Removing",CI.parameters(wfh.request['Campaign'])['SiteBlacklist']
sites_allowed = list(set(sites_allowed) - set(CI.parameters(wfh.request['Campaign'])['SiteBlacklist']))
blocks = []
if 'BlockWhitelist' in wfh.request:
blocks = wfh.request['BlockWhitelist']
memory_allowed = SI.sitesByMemory( wfh.request['Memory'] )
if memory_allowed!=None:
print "sites allowing", wfh.request['Memory'],"are",sorted(memory_allowed)
sites_allowed = list(set(sites_allowed) & set(memory_allowed))
print "Allowed",sorted(sites_allowed)
secondary_locations=None
for sec in list(secondary):
presence = getDatasetPresence( url, sec )
print sec
print json.dumps(presence, indent=2)
#one_secondary_locations = [site for (site,(there,frac)) in presence.items() if frac>90.]
one_secondary_locations = [site for (site,(there,frac)) in presence.items() if there]
if secondary_locations==None:
secondary_locations = one_secondary_locations
else:
secondary_locations = list(set(secondary_locations) & set(one_secondary_locations))
## reduce the site white list to site with secondary only
#sites_allowed = [site for site in sites_allowed if any([osite.startswith(site) for osite in one_secondary_locations])]
sites_allowed = [site for site in sites_allowed if SI.CE_to_SE(site) in one_secondary_locations]
print "From secondary requirement, now Allowed",sorted(sites_allowed)
sites_all_data = copy.deepcopy( sites_allowed )
sites_with_data = copy.deepcopy( sites_allowed )
#.........这里部分代码省略.........
def equalizor(url , specific = None, options=None):
up = componentInfo(mcm=False, soft=['mcm'])
if not up.check(): return
if not specific:
workflows = getWorkflows(url, status='running-closed', details=True)
workflows.extend(getWorkflows(url, status='running-open', details=True))
## start from scratch
modifications = defaultdict(dict)
## define regionality site => fallback allowed. feed on an ssb metric ??
mapping = defaultdict(list)
reversed_mapping = defaultdict(list)
regions = defaultdict(list)
SI = siteInfo()
CI = campaignInfo()
UC = unifiedConfiguration()
for site in SI.sites_ready:
region = site.split('_')[1]
if not region in ['US','DE','IT']: continue
regions[region] = [region]
def site_in_depletion(s):
return True
if s in SI.sites_pressure:
(m, r, pressure) = SI.sites_pressure[s]
if float(m) < float(r):
print s,m,r,"lacking pressure"
return True
else:
print s,m,r,"pressure"
pass
return False
for site in SI.sites_ready:
region = site.split('_')[1]
## fallback to the region, to site with on-going low pressure
mapping[site] = [fb for fb in SI.sites_ready if any([('_%s_'%(reg) in fb and fb!=site and site_in_depletion(fb))for reg in regions[region]]) ]
use_T0 = ('T0_CH_CERN' in UC.get("site_for_overflow"))
if options.t0: use_T0 = True
#if options.augment : use_T0 = True
use_HLT = ('T2_CH_CERN_HLT' in UC.get("site_for_overflow"))
if options.hlt: use_HLT = True
#if options.augment : use_HLT=True
if use_HLT:
mapping['T2_CH_CERN'].append('T2_CH_CERN_HLT')
if use_T0:
mapping['T2_CH_CERN'].append('T0_CH_CERN')
#mapping['T1_FR_CCIN2P3'].append('T0_CH_CERN')
#mapping['T2_IT_Legnaro'].append('T1_IT_CNAF')
for reg in ['IT','DE','UK']:
mapping['T2_CH_CERN'].extend([fb for fb in SI.sites_ready if '_%s_'%reg in fb])
## make them appear as OK to use
force_sites = []
## overflow CERN to underutilized T1s
upcoming = json.loads( open('%s/GQ.json'%monitor_dir).read())
for possible in SI.sites_T1s:
if not possible in upcoming:
mapping['T2_CH_CERN'].append(possible)
## remove add-hoc sites from overflow mapping
prevent_sites = ['T2_US_Purdue']
for prevent in prevent_sites:
if prevent in mapping: mapping.pop( prevent )
for src in mapping:
for prevent in prevent_sites:
if prevent in mapping[src]:
mapping[src].remove( prevent )
## create the reverse mapping for the condor module
for site,fallbacks in mapping.items():
for fb in fallbacks:
reversed_mapping[fb].append(site)
## this is the fallback mapping
print "Direct mapping : site => overflow"
print json.dumps( mapping, indent=2)
print "Reverse mapping : dest <= from origin"
print json.dumps( reversed_mapping, indent=2)
altered_tasks = set()
def running_idle( wfi , task_name):
gmon = wfi.getGlideMon()
#print gmon
if not gmon: return (0,0)
if not task_name in gmon: return (0,0)
return (gmon[task_name]['Running'], gmon[task_name]['Idle'])
def needs_action( wfi, task, min_idled = 100, pressure = 0.2):
#.........这里部分代码省略.........
def transferor(url ,specific = None, talk=True, options=None):
if options and options.test:
execute = False
else:
execute = True
SI = siteInfo()
CI = campaignInfo()
all_transfers=defaultdict(list)
workflow_dependencies = defaultdict(set) ## list of wf.id per input dataset
data_to_wf = {}
for wfo in session.query(Workflow).filter(Workflow.status=='considered').all():
if specific and not specific in wfo.name: continue
print wfo.name,"to be transfered"
wfh = workflowInfo( url, wfo.name)
#injection_time = time.mktime(time.strptime('.'.join(map(str,wfh.request['RequestDate'])),"%Y.%m.%d.%H.%M.%S")) / (60.*60.)
#now = time.mktime(time.gmtime()) / (60.*60.)
#if float(now - injection_time) < 4.:
# print "It is too soon to transfer", now, injection_time
# continue
(lheinput,primary,parent,secondary) = wfh.getIO()
if options and options.tosites:
sites_allowed = options.tosites.split(',')
else:
sites_allowed = getSiteWhiteList( (lheinput,primary,parent,secondary) )
if 'SiteWhitelist' in CI.parameters(wfh.request['Campaign']):
sites_allowed = CI.parameters(wfh.request['Campaign'])['SiteWhitelist']
blocks = []
if 'BlockWhitelist' in wfh.request and wfh.request['BlockWhitelist']:
blocks = wfh.request['BlockWhitelist']
can_go = True
if primary:
if talk:
print wfo.name,'reads',', '.join(primary),'in primary'
## chope the primary dataset
for prim in primary:
workflow_dependencies[prim].add( wfo.id )
presence = getDatasetPresence( url, prim )
prim_location = [site for site,pres in presence.items() if pres[0]==True]
subscriptions = listSubscriptions( url , prim )
prim_destination = [site for site in subscriptions]
prim_to_distribute = [site for site in sites_allowed if not any([osite.startswith(site) for osite in prim_location])]
prim_to_distribute = [site for site in prim_to_distribute if not any([osite.startswith(site) for osite in prim_destination])]
if len(prim_to_distribute)>0: ## maybe that a parameter we can play with to limit the
if not options or options.chop:
spreading = distributeToSites( [[prim]]+getDatasetChops(prim), prim_to_distribute, n_copies = 3, weights=SI.cpu_pledges)
else:
spreading = {}
for site in prim_to_distribute: spreading[site]=[prim]
can_go = False
for (site,items) in spreading.items():
all_transfers[site].extend( items )
if secondary:
if talk:
print wfo.name,'reads',', '.join(secondary),'in secondary'
for sec in secondary:
workflow_dependencies[sec].add( wfo.id )
presence = getDatasetPresence( url, sec )
sec_location = [site for site,pres in presence.items() if pres[1]>90.] ## more than 90% of the minbias at sites
subscriptions = listSubscriptions( url ,sec )
sec_destination = [site for site in subscriptions]
sec_to_distribute = [site for site in sites_allowed if not any([osite.startswith(site) for osite in sec_location])]
sec_to_distribute = [site for site in sec_to_distribute if not any([osite.startswith(site) for osite in sec_destination])]
if len( sec_to_distribute )>0:
for site in sec_to_distribute:
all_transfers[site].append( sec )
can_go = False
## is that possible to do something more
if can_go:
print wfo.name,"should just be assigned NOW to",sites_allowed
wfo.status = 'staged'
session.commit()
continue
else:
print wfo.name,"needs a transfer"
#print json.dumps(all_transfers)
fake_id=-1
wf_id_in_prestaging=set()
for (site,items_to_transfer) in all_transfers.iteritems():
items_to_transfer = list(set(items_to_transfer))
## convert to storage element
site_se = SI.CE_to_SE(site)
## massage a bit the items
blocks = [it for it in items_to_transfer if '#' in it]
datasets = [it for it in items_to_transfer if not '#' in it]
print "Making a replica to",site,"(CE)",site_se,"(SE) for"
#.........这里部分代码省略.........
请发表评论