/* main - slurmctld main function, start various threads and process RPCs */
int main(int argc, char *argv[])
{
pthread_attr_t thread_attr;
char node_name[128];
void *db_conn = NULL;
assoc_init_args_t assoc_init_arg;
_init_config();
log_init(argv[0], log_opts, LOG_DAEMON, NULL);
if (read_slurmdbd_conf())
exit(1);
_parse_commandline(argc, argv);
_update_logging(true);
_update_nice();
if (slurm_auth_init(NULL) != SLURM_SUCCESS) {
fatal("Unable to initialize %s authentication plugin",
slurmdbd_conf->auth_type);
}
if (slurm_acct_storage_init(NULL) != SLURM_SUCCESS) {
fatal("Unable to initialize %s accounting storage plugin",
slurmdbd_conf->storage_type);
}
_kill_old_slurmdbd();
if (foreground == 0)
_daemonize();
/*
* Need to create pidfile here in case we setuid() below
* (init_pidfile() exits if it can't initialize pid file).
* On Linux we also need to make this setuid job explicitly
* able to write a core dump.
* This also has to happen after daemon(), which closes all fd's,
* so we keep the write lock of the pidfile.
*/
_init_pidfile();
_become_slurm_user();
if (foreground == 0)
_set_work_dir();
log_config();
#ifdef PR_SET_DUMPABLE
if (prctl(PR_SET_DUMPABLE, 1) < 0)
debug ("Unable to set dumpable to 1");
#endif /* PR_SET_DUMPABLE */
if (xsignal_block(dbd_sigarray) < 0)
error("Unable to block signals");
/* Create attached thread for signal handling */
slurm_attr_init(&thread_attr);
if (pthread_create(&signal_handler_thread, &thread_attr,
_signal_handler, NULL))
fatal("pthread_create %m");
slurm_attr_destroy(&thread_attr);
registered_clusters = list_create(NULL);
slurm_attr_init(&thread_attr);
if (pthread_create(&commit_handler_thread, &thread_attr,
_commit_handler, NULL))
fatal("pthread_create %m");
slurm_attr_destroy(&thread_attr);
memset(&assoc_init_arg, 0, sizeof(assoc_init_args_t));
/* If we are tacking wckey we need to cache
wckeys, if we aren't only cache the users, qos */
assoc_init_arg.cache_level = ASSOC_MGR_CACHE_USER | ASSOC_MGR_CACHE_QOS;
if (slurmdbd_conf->track_wckey)
assoc_init_arg.cache_level |= ASSOC_MGR_CACHE_WCKEY;
db_conn = acct_storage_g_get_connection(NULL, 0, true, NULL);
if (assoc_mgr_init(db_conn, &assoc_init_arg, errno) == SLURM_ERROR) {
error("Problem getting cache of data");
acct_storage_g_close_connection(&db_conn);
goto end_it;
}
if (gethostname_short(node_name, sizeof(node_name)))
fatal("getnodename: %m");
while (1) {
if (slurmdbd_conf->dbd_backup &&
(!strcmp(node_name, slurmdbd_conf->dbd_backup) ||
!strcmp(slurmdbd_conf->dbd_backup, "localhost"))) {
info("slurmdbd running in background mode");
have_control = false;
backup = true;
/* make sure any locks are released */
acct_storage_g_commit(db_conn, 1);
run_dbd_backup();
if (!shutdown_time)
assoc_mgr_refresh_lists(db_conn);
} else if (slurmdbd_conf->dbd_host &&
(!strcmp(slurmdbd_conf->dbd_host, node_name) ||
!strcmp(slurmdbd_conf->dbd_host, "localhost"))) {
backup = false;
have_control = true;
//.........这里部分代码省略.........
开发者ID:lindenb,项目名称:slurm,代码行数:101,代码来源:slurmdbd.c
示例8: _fed_job_will_run
static int _fed_job_will_run(job_desc_msg_t *req,
will_run_response_msg_t **will_run_resp,
slurmdb_federation_rec_t *fed)
{
List resp_msg_list;
int pthread_count = 0, i;
pthread_t *load_thread = 0;
load_willrun_req_struct_t *load_args;
pthread_attr_t load_attr;
ListIterator iter;
will_run_response_msg_t *earliest_resp = NULL;
load_willrun_resp_struct_t *tmp_resp;
slurmdb_cluster_rec_t *cluster;
xassert(req);
xassert(will_run_resp);
slurm_attr_init(&load_attr);
*will_run_resp = NULL;
/* Spawn one pthread per cluster to collect job information */
resp_msg_list = list_create(NULL);
load_thread = xmalloc(sizeof(pthread_attr_t) *
list_count(fed->cluster_list));
iter = list_iterator_create(fed->cluster_list);
while ((cluster = (slurmdb_cluster_rec_t *)list_next(iter))) {
int retries = 0;
if ((cluster->control_host == NULL) ||
(cluster->control_host[0] == '\0'))
continue; /* Cluster down */
load_args = xmalloc(sizeof(load_willrun_req_struct_t));
load_args->cluster = cluster;
load_args->req = req;
load_args->resp_msg_list = resp_msg_list;
while (pthread_create(&load_thread[pthread_count], &load_attr,
_load_willrun_thread, (void *)load_args)) {
error("pthread_create error %m");
if (++retries > MAX_RETRIES)
fatal("Can't create pthread");
usleep(10000); /* sleep and retry */
}
pthread_count++;
}
list_iterator_destroy(iter);
slurm_attr_destroy(&load_attr);
/* Wait for all pthreads to complete */
for (i = 0; i < pthread_count; i++)
pthread_join(load_thread[i], NULL);
xfree(load_thread);
iter = list_iterator_create(resp_msg_list);
while ((tmp_resp = (load_willrun_resp_struct_t *)list_next(iter))) {
if (!tmp_resp->willrun_resp_msg)
slurm_seterrno(tmp_resp->rc);
else if ((!earliest_resp) ||
(tmp_resp->willrun_resp_msg->start_time <
earliest_resp->start_time)) {
slurm_free_will_run_response_msg(earliest_resp);
earliest_resp = tmp_resp->willrun_resp_msg;
tmp_resp->willrun_resp_msg = NULL;
}
slurm_free_will_run_response_msg(tmp_resp->willrun_resp_msg);
xfree(tmp_resp);
}
list_iterator_destroy(iter);
FREE_NULL_LIST(resp_msg_list);
*will_run_resp = earliest_resp;
if (!earliest_resp)
return SLURM_FAILURE;
return SLURM_SUCCESS;
}
开发者ID:HPCNow,项目名称:slurm,代码行数:78,代码来源:allocate.c
示例9: free_block_list
/* block_state_mutex should be unlocked before calling this */
extern int free_block_list(uint32_t job_id, List track_list,
bool destroy, bool wait)
{
bg_record_t *bg_record = NULL;
int retries;
ListIterator itr = NULL;
bg_free_block_list_t *bg_free_list;
pthread_attr_t attr_agent;
pthread_t thread_agent;
if (!track_list || !list_count(track_list))
return SLURM_SUCCESS;
bg_free_list = xmalloc(sizeof(bg_free_block_list_t));
bg_free_list->track_list = list_create(NULL);
bg_free_list->destroy = destroy;
bg_free_list->job_id = job_id;
slurm_mutex_lock(&block_state_mutex);
list_transfer(bg_free_list->track_list, track_list);
itr = list_iterator_create(bg_free_list->track_list);
while ((bg_record = list_next(itr))) {
if (bg_record->magic != BLOCK_MAGIC) {
error("block was already destroyed %p", bg_record);
continue;
}
bg_record->free_cnt++;
if (bg_record->job_ptr
&& !IS_JOB_FINISHED(bg_record->job_ptr)) {
info("We are freeing a block (%s) that has job %u(%u).",
bg_record->bg_block_id,
bg_record->job_ptr->job_id,
bg_record->job_running);
/* This is not thread safe if called from
bg_job_place.c anywhere from within
submit_job() or at startup. */
slurm_mutex_unlock(&block_state_mutex);
bg_requeue_job(bg_record->job_ptr->job_id, 0);
slurm_mutex_lock(&block_state_mutex);
}
if (remove_from_bg_list(bg_lists->job_running, bg_record)
== SLURM_SUCCESS)
num_unused_cpus += bg_record->cpu_cnt;
}
list_iterator_destroy(itr);
slurm_mutex_unlock(&block_state_mutex);
if (wait) {
/* Track_freeing_blocks waits until the list is done
and frees the memory of bg_free_list.
*/
_track_freeing_blocks(bg_free_list);
return SLURM_SUCCESS;
}
/* _track_freeing_blocks handles cleanup */
slurm_attr_init(&attr_agent);
if (pthread_attr_setdetachstate(&attr_agent, PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate error %m");
retries = 0;
while (pthread_create(&thread_agent, &attr_agent,
_track_freeing_blocks,
bg_free_list)) {
error("pthread_create error %m");
if (++retries > MAX_PTHREAD_RETRIES)
fatal("Can't create "
"pthread");
/* sleep and retry */
usleep(1000);
}
slurm_attr_destroy(&attr_agent);
return SLURM_SUCCESS;
}
static void *_agent(void *x)
{
struct agent_arg *args = (struct agent_arg *) x;
kvs_comm_set_t *kvs_set;
struct msg_arg *msg_args;
struct kvs_hosts *kvs_host_list;
int i, j, kvs_set_cnt = 0, host_cnt, pmi_fanout = 32;
int msg_sent = 0, max_forward = 0;
char *tmp, *fanout_off_host;
pthread_t msg_id;
pthread_attr_t attr;
DEF_TIMERS;
tmp = getenv("PMI_FANOUT");
if (tmp) {
pmi_fanout = atoi(tmp);
if (pmi_fanout < 1)
pmi_fanout = 32;
}
fanout_off_host = getenv("PMI_FANOUT_OFF_HOST");
/* only send one message to each host,
* build table of the ports on each host */
START_TIMER;
slurm_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
kvs_set = xmalloc(sizeof(kvs_comm_set_t) * args->barrier_xmit_cnt);
for (i=0; i<args->barrier_xmit_cnt; i++) {
if (args->barrier_xmit_ptr[i].port == 0)
continue; /* already sent message to host */
kvs_host_list = xmalloc(sizeof(struct kvs_hosts) * pmi_fanout);
host_cnt = 0;
/* This code enables key-pair forwarding between
* tasks. First task on the node gets the key-pairs
* with host/port information for all other tasks on
* that node it should forward the information to. */
for (j=(i+1); j<args->barrier_xmit_cnt; j++) {
if (args->barrier_xmit_ptr[j].port == 0)
continue; /* already sent message */
if ((fanout_off_host == NULL) &&
strcmp(args->barrier_xmit_ptr[i].hostname,
args->barrier_xmit_ptr[j].hostname))
continue; /* another host */
kvs_host_list[host_cnt].task_id = 0; /* not avail */
kvs_host_list[host_cnt].port =
args->barrier_xmit_ptr[j].port;
kvs_host_list[host_cnt].hostname =
args->barrier_xmit_ptr[j].hostname;
args->barrier_xmit_ptr[j].port = 0;/* don't reissue */
host_cnt++;
if (host_cnt >= pmi_fanout)
break;
}
msg_sent++;
max_forward = MAX(host_cnt, max_forward);
slurm_mutex_lock(&agent_mutex);
while (agent_cnt >= agent_max_cnt)
pthread_cond_wait(&agent_cond, &agent_mutex);
agent_cnt++;
slurm_mutex_unlock(&agent_mutex);
msg_args = xmalloc(sizeof(struct msg_arg));
msg_args->bar_ptr = &args->barrier_xmit_ptr[i];
msg_args->kvs_ptr = &kvs_set[kvs_set_cnt];
kvs_set[kvs_set_cnt].host_cnt = host_cnt;
kvs_set[kvs_set_cnt].kvs_host_ptr = kvs_host_list;
kvs_set[kvs_set_cnt].kvs_comm_recs = args->kvs_xmit_cnt;
kvs_set[kvs_set_cnt].kvs_comm_ptr = args->kvs_xmit_ptr;
kvs_set_cnt++;
if (agent_max_cnt == 1) {
/* TotalView slows down a great deal for
* pthread_create() calls, so just send the
* messages inline when TotalView is in use
* or for some other reason we only want
* one pthread. */
_msg_thread((void *) msg_args);
} else if (pthread_create(&msg_id, &attr, _msg_thread,
(void *) msg_args)) {
fatal("pthread_create: %m");
}
}
verbose("Sent KVS info to %d nodes, up to %d tasks per node",
msg_sent, (max_forward+1));
/* wait for completion of all outgoing message */
slurm_mutex_lock(&agent_mutex);
while (agent_cnt > 0)
pthread_cond_wait(&agent_cond, &agent_mutex);
slurm_mutex_unlock(&agent_mutex);
slurm_attr_destroy(&attr);
/* Release allocated memory */
for (i=0; i<kvs_set_cnt; i++)
xfree(kvs_set[i].kvs_host_ptr);
xfree(kvs_set);
for (i=0; i<args->barrier_xmit_cnt; i++)
//.........这里部分代码省略.........
static void _xlate_before(char *depend, uint32_t submit_uid, uint32_t my_job_id)
{
uint32_t job_id;
char *last_ptr = NULL, *new_dep = NULL, *tok, *type;
struct job_record *job_ptr;
pthread_attr_t attr;
pthread_t dep_thread;
tok = strtok_r(depend, ":", &last_ptr);
if (!xstrcmp(tok, "before"))
type = "after";
else if (!xstrcmp(tok, "beforeany"))
type = "afterany";
else if (!xstrcmp(tok, "beforenotok"))
type = "afternotok";
else if (!xstrcmp(tok, "beforeok"))
type = "afterok";
else {
info("%s: discarding invalid job dependency option %s",
plugin_type, tok);
return;
}
/* NOTE: We are updating a job record here in order to implement
* the depend=before option. We are doing so without the write lock
* on the job record, but using a local mutex to prevent multiple
* updates on the same job when multiple jobs satisfying the dependency
* are being processed at the same time (all with read locks). The
* job read lock will prevent anyone else from getting a job write
* lock and using a job write lock causes serious performance problems
* for slow job_submit plugins. Not an ideal solution, but the best
* option that we see. */
slurm_mutex_lock(&depend_mutex);
tok = strtok_r(NULL, ":", &last_ptr);
while (tok) {
job_id = atoi(tok);
job_ptr = find_job_record(job_id);
if (!job_ptr) {
info("%s: discarding invalid job dependency before %s",
plugin_type, tok);
} else if ((submit_uid != job_ptr->user_id) &&
!validate_super_user(submit_uid)) {
error("%s: Security violation: uid %u trying to alter "
"job %u belonging to uid %u",
plugin_type, submit_uid, job_ptr->job_id,
job_ptr->user_id);
} else if ((!IS_JOB_PENDING(job_ptr)) ||
(job_ptr->details == NULL)) {
info("%s: discarding job before dependency on "
"non-pending job %u",
plugin_type, job_ptr->job_id);
} else {
if (job_ptr->details->dependency) {
xstrcat(new_dep, job_ptr->details->dependency);
xstrcat(new_dep, ",");
}
xstrfmtcat(new_dep, "%s:%u", type, my_job_id);
xfree(job_ptr->details->dependency);
job_ptr->details->dependency = new_dep;
new_dep = NULL;
_decr_depend_cnt(job_ptr);
slurm_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
pthread_create(&dep_thread, &attr, _dep_agent, job_ptr);
slurm_attr_destroy(&attr);
}
tok = strtok_r(NULL, ":", &last_ptr);
}
slurm_mutex_unlock(&depend_mutex);
}
请发表评论