Merge branch 'master' into sqlite

This commit is contained in:
Thomas Hobson 2021-10-02 00:10:02 +13:00 committed by GitHub
commit 874cc815e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 35 deletions

View File

@ -114,6 +114,13 @@ const options = [
'https://github.com/engineer-man/piston/releases/download/pkgs/index', 'https://github.com/engineer-man/piston/releases/download/pkgs/index',
validators: [], validators: [],
}, },
{
key: 'max_concurrent_jobs',
desc: 'Maximum number of concurrent jobs to run at one time',
default: 64,
parser: parse_int,
validators: [(x) => x > 0 || `${x} cannot be negative`]
}
]; ];
logger.info(`Loading Configuration from environment`); logger.info(`Loading Configuration from environment`);

View File

@ -16,6 +16,19 @@ const job_states = {
let uid = 0; let uid = 0;
let gid = 0; let gid = 0;
let remainingJobSpaces = config.max_concurrent_jobs;
let jobQueue = [];
setInterval(()=>{
// Every 10ms try resolve a new job, if there is an available slot
if(jobQueue.length > 0 && remainingJobSpaces > 0){
jobQueue.shift()()
}
}, 10)
class Job { class Job {
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4(); this.uuid = uuidv4();
@ -48,8 +61,15 @@ class Job {
} }
async prime() { async prime() {
logger.info(`Priming job uuid=${this.uuid}`); if(remainingJobSpaces < 1){
logger.info(`Awaiting job slot uuid=${this.uuid}`)
await new Promise((resolve)=>{
jobQueue.push(resolve)
})
}
logger.info(`Priming job uuid=${this.uuid}`);
remainingJobSpaces--;
logger.debug('Writing files to job cache'); logger.debug('Writing files to job cache');
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`); logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
@ -126,47 +146,55 @@ class Job {
const kill_timeout = set_timeout( const kill_timeout = set_timeout(
_ => proc.kill('SIGKILL'), async _ => {
logger.info(`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`)
process.kill(proc.pid, 'SIGKILL')
},
timeout timeout
); );
proc.stderr.on('data', data => { proc.stderr.on('data', async data => {
if(eventBus !== null) { if(eventBus !== null) {
eventBus.emit("stderr", data); eventBus.emit("stderr", data);
} else if (stderr.length > config.output_max_size) { } else if (stderr.length > config.output_max_size) {
proc.kill('SIGKILL'); logger.info(`stderr length exceeded uuid=${this.uuid}`)
process.kill(proc.pid, 'SIGKILL')
} else { } else {
stderr += data; stderr += data;
output += data; output += data;
} }
}); });
proc.stdout.on('data', data => { proc.stdout.on('data', async data => {
if(eventBus !== null){ if(eventBus !== null){
eventBus.emit("stdout", data); eventBus.emit("stdout", data);
} else if (stdout.length > config.output_max_size) { } else if (stdout.length > config.output_max_size) {
proc.kill('SIGKILL'); logger.info(`stdout length exceeded uuid=${this.uuid}`)
process.kill(proc.pid, 'SIGKILL')
} else { } else {
stdout += data; stdout += data;
output += data; output += data;
} }
}); });
const exit_cleanup = () => { const exit_cleanup = async () => {
clear_timeout(kill_timeout); clear_timeout(kill_timeout);
proc.stderr.destroy(); proc.stderr.destroy();
proc.stdout.destroy(); proc.stdout.destroy();
await this.cleanup_processes()
logger.debug(`Finished exit cleanup uuid=${this.uuid}`)
}; };
proc.on('exit', (code, signal) => { proc.on('exit', async (code, signal) => {
exit_cleanup(); await exit_cleanup();
resolve({stdout, stderr, code, signal, output }); resolve({stdout, stderr, code, signal, output });
}); });
proc.on('error', err => { proc.on('error', async err => {
exit_cleanup(); await exit_cleanup();
reject({ error: err, stdout, stderr, output }); reject({ error: err, stdout, stderr, output });
}); });
@ -262,36 +290,47 @@ class Job {
this.state = job_states.EXECUTED; this.state = job_states.EXECUTED;
} }
async cleanup_processes() { async cleanup_processes(dont_wait = []) {
let processes = [1]; let processes = [1];
logger.debug(`Cleaning up processes uuid=${this.uuid}`)
while (processes.length > 0) { while (processes.length > 0) {
processes = await new Promise((resolve, reject) => processes = []
cp.execFile('ps', ['awwxo', 'pid,ruid'], (err, stdout) => {
if (err === null) {
const lines = stdout.split('\n').slice(1); //Remove header with slice
const procs = lines.map(line => {
const [pid, ruid] = line
.trim()
.split(/\s+/)
.map(n => parseInt(n));
return { pid, ruid };
});
resolve(procs); const proc_ids = await fs.readdir("/proc");
} else {
reject(error);
processes = await Promise.all(proc_ids.map(async (proc_id) => {
if(isNaN(proc_id)) return -1;
try{
const proc_status = await fs.read_file(path.join("/proc",proc_id,"status"));
const proc_lines = proc_status.to_string().split("\n")
const uid_line = proc_lines.find(line=>line.starts_with("Uid:"))
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
if(ruid == this.uid || euid == this.uid)
return parse_int(proc_id)
}catch{
return -1
} }
})
);
processes = processes.filter(proc => proc.ruid === this.uid); return -1
}))
processes = processes.filter(p => p > 0)
if(processes.length > 0)
logger.debug(`Got processes to kill: ${processes} uuid=${this.uuid}`)
for (const proc of processes) { for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork // First stop the processes, but keep their resources allocated so they cant re-fork
try { try {
process.kill(proc.pid, 'SIGSTOP'); process.kill(proc, 'SIGSTOP');
} catch { } catch {
// Could already be dead // Could already be dead
} }
@ -300,14 +339,17 @@ class Job {
for (const proc of processes) { for (const proc of processes) {
// Then clear them out of the process tree // Then clear them out of the process tree
try { try {
process.kill(proc.pid, 'SIGKILL'); process.kill(proc, 'SIGKILL');
} catch { } catch {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
} }
wait_pid(proc.pid); if(!dont_wait.includes(proc))
wait_pid(proc);
} }
} }
logger.debug(`Cleaned up processes uuid=${this.uuid}`)
} }
async cleanup_filesystem() { async cleanup_filesystem() {
@ -339,11 +381,13 @@ class Job {
async cleanup() { async cleanup() {
logger.info(`Cleaning up job uuid=${this.uuid}`); logger.info(`Cleaning up job uuid=${this.uuid}`);
await this.cleanup_processes();
await this.cleanup_filesystem(); await this.cleanup_filesystem();
remainingJobSpaces++;
} }
} }
module.exports = { module.exports = {
Job, Job,
}; };

View File

@ -145,3 +145,12 @@ default: https://github.com/engineer-man/piston/releases/download/pkgs/index
``` ```
URL for repository index, where packages will be downloaded from. URL for repository index, where packages will be downloaded from.
## Maximum Concurrent Jobs
```yaml
key: PISTON_MAX_CONCURRENT_JOBS
default: 64
```
Maximum number of jobs to run concurrently.

View File

@ -1,4 +1,4 @@
Copyright (c) 2018-2021 Brian Seymour, EMKC Contributors Copyright (c) 2018-2021 Brian Seymour, Thomas Hobson, EMKC Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal