Compare commits
2 Commits
d8b430654b
...
c091c117c7
Author | SHA1 | Date |
---|---|---|
Thomas Hobson | c091c117c7 | |
Thomas Hobson | c7efa5372a |
153
api/src/job.js
153
api/src/job.js
|
@ -1,10 +1,12 @@
|
|||
const logger = require('logplease').create('job');
|
||||
const logplease = require('logplease');
|
||||
const logger = logplease.create('job');
|
||||
const { v4: uuidv4 } = require('uuid');
|
||||
const cp = require('child_process');
|
||||
const path = require('path');
|
||||
const config = require('./config');
|
||||
const globals = require('./globals');
|
||||
const fs = require('fs/promises');
|
||||
const fss = require('fs');
|
||||
const wait_pid = require('waitpid');
|
||||
|
||||
const job_states = {
|
||||
|
@ -29,6 +31,9 @@ setInterval(() => {
|
|||
class Job {
|
||||
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
||||
this.uuid = uuidv4();
|
||||
|
||||
this.logger = logplease.create(`job/${this.uuid}`);
|
||||
|
||||
this.runtime = runtime;
|
||||
this.files = files.map((file, i) => ({
|
||||
name: file.name || `file${i}.code`,
|
||||
|
@ -53,6 +58,8 @@ class Job {
|
|||
uid %= config.runner_uid_max - config.runner_uid_min + 1;
|
||||
gid %= config.runner_gid_max - config.runner_gid_min + 1;
|
||||
|
||||
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
|
||||
|
||||
this.state = job_states.READY;
|
||||
this.dir = path.join(
|
||||
config.data_directory,
|
||||
|
@ -63,17 +70,17 @@ class Job {
|
|||
|
||||
async prime() {
|
||||
if (remaining_job_spaces < 1) {
|
||||
logger.info(`Awaiting job slot uuid=${this.uuid}`);
|
||||
this.logger.info(`Awaiting job slot`);
|
||||
await new Promise(resolve => {
|
||||
jobQueue.push(resolve);
|
||||
});
|
||||
}
|
||||
|
||||
logger.info(`Priming job uuid=${this.uuid}`);
|
||||
this.logger.info(`Priming job`);
|
||||
remaining_job_spaces--;
|
||||
logger.debug('Writing files to job cache');
|
||||
this.logger.debug('Writing files to job cache');
|
||||
|
||||
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
|
||||
this.logger.debug(`Transfering ownership`);
|
||||
|
||||
await fs.mkdir(this.dir, { mode: 0o700 });
|
||||
await fs.chown(this.dir, this.uid, this.gid);
|
||||
|
@ -100,7 +107,7 @@ class Job {
|
|||
|
||||
this.state = job_states.PRIMED;
|
||||
|
||||
logger.debug('Primed job');
|
||||
this.logger.debug('Primed job');
|
||||
}
|
||||
|
||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
||||
|
@ -118,7 +125,14 @@ class Job {
|
|||
prlimit.push('--as=' + memory_limit);
|
||||
}
|
||||
|
||||
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
|
||||
const proc_call = [
|
||||
'nice',
|
||||
...prlimit,
|
||||
...nonetwork,
|
||||
'bash',
|
||||
file,
|
||||
...args,
|
||||
];
|
||||
|
||||
var stdout = '';
|
||||
var stderr = '';
|
||||
|
@ -153,9 +167,7 @@ class Job {
|
|||
const kill_timeout =
|
||||
(timeout >= 0 &&
|
||||
set_timeout(async _ => {
|
||||
logger.info(
|
||||
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
|
||||
);
|
||||
this.logger.info(`Timeout exceeded timeout=${timeout}`);
|
||||
process.kill(proc.pid, 'SIGKILL');
|
||||
}, timeout)) ||
|
||||
null;
|
||||
|
@ -164,7 +176,7 @@ class Job {
|
|||
if (eventBus !== null) {
|
||||
eventBus.emit('stderr', data);
|
||||
} else if (stderr.length > this.runtime.output_max_size) {
|
||||
logger.info(`stderr length exceeded uuid=${this.uuid}`);
|
||||
this.logger.info(`stderr length exceeded`);
|
||||
process.kill(proc.pid, 'SIGKILL');
|
||||
} else {
|
||||
stderr += data;
|
||||
|
@ -176,7 +188,7 @@ class Job {
|
|||
if (eventBus !== null) {
|
||||
eventBus.emit('stdout', data);
|
||||
} else if (stdout.length > this.runtime.output_max_size) {
|
||||
logger.info(`stdout length exceeded uuid=${this.uuid}`);
|
||||
this.logger.info(`stdout length exceeded`);
|
||||
process.kill(proc.pid, 'SIGKILL');
|
||||
} else {
|
||||
stdout += data;
|
||||
|
@ -184,24 +196,24 @@ class Job {
|
|||
}
|
||||
});
|
||||
|
||||
const exit_cleanup = async () => {
|
||||
const exit_cleanup = () => {
|
||||
clear_timeout(kill_timeout);
|
||||
|
||||
proc.stderr.destroy();
|
||||
proc.stdout.destroy();
|
||||
|
||||
await this.cleanup_processes();
|
||||
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
|
||||
this.cleanup_processes();
|
||||
this.logger.debug(`Finished exit cleanup`);
|
||||
};
|
||||
|
||||
proc.on('exit', async (code, signal) => {
|
||||
await exit_cleanup();
|
||||
proc.on('exit', (code, signal) => {
|
||||
exit_cleanup();
|
||||
|
||||
resolve({ stdout, stderr, code, signal, output });
|
||||
});
|
||||
|
||||
proc.on('error', async err => {
|
||||
await exit_cleanup();
|
||||
proc.on('error', err => {
|
||||
exit_cleanup();
|
||||
|
||||
reject({ error: err, stdout, stderr, output });
|
||||
});
|
||||
|
@ -216,17 +228,13 @@ class Job {
|
|||
);
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
|
||||
this.gid
|
||||
} runtime=${this.runtime.toString()}`
|
||||
);
|
||||
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
|
||||
|
||||
const code_files =
|
||||
(this.runtime.language === 'file' && this.files) ||
|
||||
this.files.filter(file => file.encoding == 'utf8');
|
||||
|
||||
logger.debug('Compiling');
|
||||
this.logger.debug('Compiling');
|
||||
|
||||
let compile;
|
||||
|
||||
|
@ -239,7 +247,7 @@ class Job {
|
|||
);
|
||||
}
|
||||
|
||||
logger.debug('Running');
|
||||
this.logger.debug('Running');
|
||||
|
||||
const run = await this.safe_call(
|
||||
path.join(this.runtime.pkgdir, 'run'),
|
||||
|
@ -266,10 +274,8 @@ class Job {
|
|||
);
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Interactively executing job uuid=${this.uuid} uid=${
|
||||
this.uid
|
||||
} gid=${this.gid} runtime=${this.runtime.toString()}`
|
||||
this.logger.info(
|
||||
`Interactively executing job runtime=${this.runtime.toString()}`
|
||||
);
|
||||
|
||||
const code_files =
|
||||
|
@ -289,7 +295,7 @@ class Job {
|
|||
eventBus.emit('exit', 'compile', { error, code, signal });
|
||||
}
|
||||
|
||||
logger.debug('Running');
|
||||
this.logger.debug('Running');
|
||||
eventBus.emit('stage', 'run');
|
||||
const { error, code, signal } = await this.safe_call(
|
||||
path.join(this.runtime.pkgdir, 'run'),
|
||||
|
@ -304,52 +310,62 @@ class Job {
|
|||
this.state = job_states.EXECUTED;
|
||||
}
|
||||
|
||||
async cleanup_processes(dont_wait = []) {
|
||||
cleanup_processes(dont_wait = []) {
|
||||
let processes = [1];
|
||||
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
|
||||
const to_wait = [];
|
||||
this.logger.debug(`Cleaning up processes`);
|
||||
|
||||
while (processes.length > 0) {
|
||||
processes = [];
|
||||
|
||||
const proc_ids = await fs.readdir('/proc');
|
||||
const proc_ids = fss.readdir_sync('/proc');
|
||||
|
||||
processes = await Promise.all(
|
||||
proc_ids.map(async proc_id => {
|
||||
if (isNaN(proc_id)) return -1;
|
||||
try {
|
||||
const proc_status = await fs.read_file(
|
||||
path.join('/proc', proc_id, 'status')
|
||||
);
|
||||
const proc_lines = proc_status.to_string().split('\n');
|
||||
const uid_line = proc_lines.find(line =>
|
||||
line.starts_with('Uid:')
|
||||
);
|
||||
const [_, ruid, euid, suid, fuid] =
|
||||
uid_line.split(/\s+/);
|
||||
processes = proc_ids.map(proc_id => {
|
||||
if (isNaN(proc_id)) return -1;
|
||||
try {
|
||||
const proc_status = fss.read_file_sync(
|
||||
path.join('/proc', proc_id, 'status')
|
||||
);
|
||||
const proc_lines = proc_status.to_string().split('\n');
|
||||
const state_line = proc_lines.find(line =>
|
||||
line.starts_with('State:')
|
||||
);
|
||||
const uid_line = proc_lines.find(line =>
|
||||
line.starts_with('Uid:')
|
||||
);
|
||||
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
|
||||
|
||||
if (ruid == this.uid || euid == this.uid)
|
||||
return parse_int(proc_id);
|
||||
} catch {
|
||||
const [_1, state, user_friendly] = state_line.split(/\s+/);
|
||||
|
||||
if (state == 'Z')
|
||||
// Zombie process, just needs to be waited
|
||||
return -1;
|
||||
}
|
||||
// We should kill in all other state (Sleep, Stopped & Running)
|
||||
|
||||
if (ruid == this.uid || euid == this.uid)
|
||||
return parse_int(proc_id);
|
||||
} catch {
|
||||
return -1;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
return -1;
|
||||
});
|
||||
|
||||
processes = processes.filter(p => p > 0);
|
||||
|
||||
if (processes.length > 0)
|
||||
logger.debug(
|
||||
`Got processes to kill: ${processes} uuid=${this.uuid}`
|
||||
);
|
||||
this.logger.debug(`Got processes to kill: ${processes}`);
|
||||
|
||||
for (const proc of processes) {
|
||||
// First stop the processes, but keep their resources allocated so they cant re-fork
|
||||
try {
|
||||
process.kill(proc, 'SIGSTOP');
|
||||
} catch {
|
||||
} catch (e) {
|
||||
// Could already be dead
|
||||
this.logger.debug(
|
||||
`Got error while SIGSTOPping process ${proc}:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -359,13 +375,27 @@ class Job {
|
|||
process.kill(proc, 'SIGKILL');
|
||||
} catch {
|
||||
// Could already be dead and just needs to be waited on
|
||||
this.logger.debug(
|
||||
`Got error while SIGKILLing process ${proc}:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
if (!dont_wait.includes(proc)) wait_pid(proc);
|
||||
to_wait.push(proc);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
|
||||
this.logger.debug(
|
||||
`Finished kill-loop, calling wait_pid to end any zombie processes`
|
||||
);
|
||||
|
||||
for (const proc of to_wait) {
|
||||
if (dont_wait.includes(proc)) continue;
|
||||
|
||||
wait_pid(proc);
|
||||
}
|
||||
|
||||
this.logger.debug(`Cleaned up processes`);
|
||||
}
|
||||
|
||||
async cleanup_filesystem() {
|
||||
|
@ -386,7 +416,7 @@ class Job {
|
|||
}
|
||||
} catch (e) {
|
||||
// File was somehow deleted in the time that we read the dir to when we checked the file
|
||||
logger.warn(`Error removing file ${file_path}: ${e}`);
|
||||
this.logger.warn(`Error removing file ${file_path}: ${e}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -395,8 +425,9 @@ class Job {
|
|||
}
|
||||
|
||||
async cleanup() {
|
||||
logger.info(`Cleaning up job uuid=${this.uuid}`);
|
||||
this.logger.info(`Cleaning up job`);
|
||||
|
||||
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
|
||||
await this.cleanup_filesystem();
|
||||
|
||||
remaining_job_spaces++;
|
||||
|
|
Loading…
Reference in New Issue