diff --git a/api/src/config.js b/api/src/config.js index 84270aa..bbd7ae9 100644 --- a/api/src/config.js +++ b/api/src/config.js @@ -114,6 +114,13 @@ const options = [ 'https://github.com/engineer-man/piston/releases/download/pkgs/index', validators: [], }, + { + key: 'max_concurrent_jobs', + desc: 'Maximum number of concurrent jobs to run at one time', + default: 64, + parser: parse_int, + validators: [(x) => x > 0 || `${x} cannot be negative`] + } ]; logger.info(`Loading Configuration from environment`); diff --git a/api/src/job.js b/api/src/job.js index 683cda6..ecc4ab3 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -16,6 +16,19 @@ const job_states = { let uid = 0; let gid = 0; +let remainingJobSpaces = config.max_concurrent_jobs; +let jobQueue = []; + + +setInterval(()=>{ + // Every 10ms try resolve a new job, if there is an available slot + if(jobQueue.length > 0 && remainingJobSpaces > 0){ + jobQueue.shift()() + } +}, 10) + + + class Job { constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { this.uuid = uuidv4(); @@ -48,8 +61,15 @@ class Job { } async prime() { - logger.info(`Priming job uuid=${this.uuid}`); + if(remainingJobSpaces < 1){ + logger.info(`Awaiting job slot uuid=${this.uuid}`) + await new Promise((resolve)=>{ + jobQueue.push(resolve) + }) + } + logger.info(`Priming job uuid=${this.uuid}`); + remainingJobSpaces--; logger.debug('Writing files to job cache'); logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`); @@ -126,47 +146,55 @@ class Job { const kill_timeout = set_timeout( - _ => proc.kill('SIGKILL'), + async _ => { + logger.info(`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`) + process.kill(proc.pid, 'SIGKILL') + }, timeout ); - proc.stderr.on('data', data => { + proc.stderr.on('data', async data => { if(eventBus !== null) { eventBus.emit("stderr", data); } else if (stderr.length > config.output_max_size) { - proc.kill('SIGKILL'); + logger.info(`stderr length exceeded uuid=${this.uuid}`) + process.kill(proc.pid, 'SIGKILL') } else { stderr += data; output += data; } }); - proc.stdout.on('data', data => { + proc.stdout.on('data', async data => { if(eventBus !== null){ eventBus.emit("stdout", data); } else if (stdout.length > config.output_max_size) { - proc.kill('SIGKILL'); + logger.info(`stdout length exceeded uuid=${this.uuid}`) + process.kill(proc.pid, 'SIGKILL') } else { stdout += data; output += data; } }); - const exit_cleanup = () => { + const exit_cleanup = async () => { clear_timeout(kill_timeout); proc.stderr.destroy(); proc.stdout.destroy(); + + await this.cleanup_processes() + logger.debug(`Finished exit cleanup uuid=${this.uuid}`) }; - proc.on('exit', (code, signal) => { - exit_cleanup(); + proc.on('exit', async (code, signal) => { + await exit_cleanup(); resolve({stdout, stderr, code, signal, output }); }); - proc.on('error', err => { - exit_cleanup(); + proc.on('error', async err => { + await exit_cleanup(); reject({ error: err, stdout, stderr, output }); }); @@ -262,36 +290,47 @@ class Job { this.state = job_states.EXECUTED; } - async cleanup_processes() { + async cleanup_processes(dont_wait = []) { let processes = [1]; + logger.debug(`Cleaning up processes uuid=${this.uuid}`) while (processes.length > 0) { - processes = await new Promise((resolve, reject) => - cp.execFile('ps', ['awwxo', 'pid,ruid'], (err, stdout) => { - if (err === null) { - const lines = stdout.split('\n').slice(1); //Remove header with slice - const procs = lines.map(line => { - const [pid, ruid] = line - .trim() - .split(/\s+/) - .map(n => parseInt(n)); + processes = [] - return { pid, ruid }; - }); - resolve(procs); - } else { - reject(error); - } - }) - ); + const proc_ids = await fs.readdir("/proc"); + + + processes = await Promise.all(proc_ids.map(async (proc_id) => { + if(isNaN(proc_id)) return -1; + try{ + const proc_status = await fs.read_file(path.join("/proc",proc_id,"status")); + const proc_lines = proc_status.to_string().split("\n") + const uid_line = proc_lines.find(line=>line.starts_with("Uid:")) + const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/); + + + if(ruid == this.uid || euid == this.uid) + return parse_int(proc_id) + + }catch{ + return -1 + } + + return -1 + })) + + processes = processes.filter(p => p > 0) + + if(processes.length > 0) + logger.debug(`Got processes to kill: ${processes} uuid=${this.uuid}`) + - processes = processes.filter(proc => proc.ruid === this.uid); for (const proc of processes) { // First stop the processes, but keep their resources allocated so they cant re-fork try { - process.kill(proc.pid, 'SIGSTOP'); + process.kill(proc, 'SIGSTOP'); } catch { // Could already be dead } @@ -300,14 +339,17 @@ class Job { for (const proc of processes) { // Then clear them out of the process tree try { - process.kill(proc.pid, 'SIGKILL'); + process.kill(proc, 'SIGKILL'); } catch { // Could already be dead and just needs to be waited on } - wait_pid(proc.pid); + if(!dont_wait.includes(proc)) + wait_pid(proc); } } + + logger.debug(`Cleaned up processes uuid=${this.uuid}`) } async cleanup_filesystem() { @@ -339,11 +381,13 @@ class Job { async cleanup() { logger.info(`Cleaning up job uuid=${this.uuid}`); - await this.cleanup_processes(); await this.cleanup_filesystem(); + + remainingJobSpaces++; } } + module.exports = { Job, }; diff --git a/docs/configuration.md b/docs/configuration.md index 1388e9d..16a5df0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -145,3 +145,12 @@ default: https://github.com/engineer-man/piston/releases/download/pkgs/index ``` URL for repository index, where packages will be downloaded from. + +## Maximum Concurrent Jobs + +```yaml +key: PISTON_MAX_CONCURRENT_JOBS +default: 64 +``` + +Maximum number of jobs to run concurrently. diff --git a/license b/license index 4f45aea..fd203f8 100644 --- a/license +++ b/license @@ -1,4 +1,4 @@ -Copyright (c) 2018-2021 Brian Seymour, EMKC Contributors +Copyright (c) 2018-2021 Brian Seymour, Thomas Hobson, EMKC Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal