Compare commits

..

No commits in common. "c091c117c7c1a5165c137e3d367adc8eea2662f4" and "d8b430654bc5da463d178311345d08c870a7b9e4" have entirely different histories.

1 changed files with 61 additions and 92 deletions

View File

@ -1,12 +1,10 @@
const logplease = require('logplease'); const logger = require('logplease').create('job');
const logger = logplease.create('job');
const { v4: uuidv4 } = require('uuid'); const { v4: uuidv4 } = require('uuid');
const cp = require('child_process'); const cp = require('child_process');
const path = require('path'); const path = require('path');
const config = require('./config'); const config = require('./config');
const globals = require('./globals'); const globals = require('./globals');
const fs = require('fs/promises'); const fs = require('fs/promises');
const fss = require('fs');
const wait_pid = require('waitpid'); const wait_pid = require('waitpid');
const job_states = { const job_states = {
@ -31,9 +29,6 @@ setInterval(() => {
class Job { class Job {
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4(); this.uuid = uuidv4();
this.logger = logplease.create(`job/${this.uuid}`);
this.runtime = runtime; this.runtime = runtime;
this.files = files.map((file, i) => ({ this.files = files.map((file, i) => ({
name: file.name || `file${i}.code`, name: file.name || `file${i}.code`,
@ -58,8 +53,6 @@ class Job {
uid %= config.runner_uid_max - config.runner_uid_min + 1; uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1; gid %= config.runner_gid_max - config.runner_gid_min + 1;
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
this.state = job_states.READY; this.state = job_states.READY;
this.dir = path.join( this.dir = path.join(
config.data_directory, config.data_directory,
@ -70,17 +63,17 @@ class Job {
async prime() { async prime() {
if (remaining_job_spaces < 1) { if (remaining_job_spaces < 1) {
this.logger.info(`Awaiting job slot`); logger.info(`Awaiting job slot uuid=${this.uuid}`);
await new Promise(resolve => { await new Promise(resolve => {
jobQueue.push(resolve); jobQueue.push(resolve);
}); });
} }
this.logger.info(`Priming job`); logger.info(`Priming job uuid=${this.uuid}`);
remaining_job_spaces--; remaining_job_spaces--;
this.logger.debug('Writing files to job cache'); logger.debug('Writing files to job cache');
this.logger.debug(`Transfering ownership`); logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
await fs.mkdir(this.dir, { mode: 0o700 }); await fs.mkdir(this.dir, { mode: 0o700 });
await fs.chown(this.dir, this.uid, this.gid); await fs.chown(this.dir, this.uid, this.gid);
@ -107,7 +100,7 @@ class Job {
this.state = job_states.PRIMED; this.state = job_states.PRIMED;
this.logger.debug('Primed job'); logger.debug('Primed job');
} }
async safe_call(file, args, timeout, memory_limit, eventBus = null) { async safe_call(file, args, timeout, memory_limit, eventBus = null) {
@ -125,14 +118,7 @@ class Job {
prlimit.push('--as=' + memory_limit); prlimit.push('--as=' + memory_limit);
} }
const proc_call = [ const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
'nice',
...prlimit,
...nonetwork,
'bash',
file,
...args,
];
var stdout = ''; var stdout = '';
var stderr = ''; var stderr = '';
@ -167,7 +153,9 @@ class Job {
const kill_timeout = const kill_timeout =
(timeout >= 0 && (timeout >= 0 &&
set_timeout(async _ => { set_timeout(async _ => {
this.logger.info(`Timeout exceeded timeout=${timeout}`); logger.info(
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
);
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
}, timeout)) || }, timeout)) ||
null; null;
@ -176,7 +164,7 @@ class Job {
if (eventBus !== null) { if (eventBus !== null) {
eventBus.emit('stderr', data); eventBus.emit('stderr', data);
} else if (stderr.length > this.runtime.output_max_size) { } else if (stderr.length > this.runtime.output_max_size) {
this.logger.info(`stderr length exceeded`); logger.info(`stderr length exceeded uuid=${this.uuid}`);
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
} else { } else {
stderr += data; stderr += data;
@ -188,7 +176,7 @@ class Job {
if (eventBus !== null) { if (eventBus !== null) {
eventBus.emit('stdout', data); eventBus.emit('stdout', data);
} else if (stdout.length > this.runtime.output_max_size) { } else if (stdout.length > this.runtime.output_max_size) {
this.logger.info(`stdout length exceeded`); logger.info(`stdout length exceeded uuid=${this.uuid}`);
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
} else { } else {
stdout += data; stdout += data;
@ -196,24 +184,24 @@ class Job {
} }
}); });
const exit_cleanup = () => { const exit_cleanup = async () => {
clear_timeout(kill_timeout); clear_timeout(kill_timeout);
proc.stderr.destroy(); proc.stderr.destroy();
proc.stdout.destroy(); proc.stdout.destroy();
this.cleanup_processes(); await this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`); logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
}; };
proc.on('exit', (code, signal) => { proc.on('exit', async (code, signal) => {
exit_cleanup(); await exit_cleanup();
resolve({ stdout, stderr, code, signal, output }); resolve({ stdout, stderr, code, signal, output });
}); });
proc.on('error', err => { proc.on('error', async err => {
exit_cleanup(); await exit_cleanup();
reject({ error: err, stdout, stderr, output }); reject({ error: err, stdout, stderr, output });
}); });
@ -228,13 +216,17 @@ class Job {
); );
} }
this.logger.info(`Executing job runtime=${this.runtime.toString()}`); logger.info(
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
this.gid
} runtime=${this.runtime.toString()}`
);
const code_files = const code_files =
(this.runtime.language === 'file' && this.files) || (this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8'); this.files.filter(file => file.encoding == 'utf8');
this.logger.debug('Compiling'); logger.debug('Compiling');
let compile; let compile;
@ -247,7 +239,7 @@ class Job {
); );
} }
this.logger.debug('Running'); logger.debug('Running');
const run = await this.safe_call( const run = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'), path.join(this.runtime.pkgdir, 'run'),
@ -274,8 +266,10 @@ class Job {
); );
} }
this.logger.info( logger.info(
`Interactively executing job runtime=${this.runtime.toString()}` `Interactively executing job uuid=${this.uuid} uid=${
this.uid
} gid=${this.gid} runtime=${this.runtime.toString()}`
); );
const code_files = const code_files =
@ -295,7 +289,7 @@ class Job {
eventBus.emit('exit', 'compile', { error, code, signal }); eventBus.emit('exit', 'compile', { error, code, signal });
} }
this.logger.debug('Running'); logger.debug('Running');
eventBus.emit('stage', 'run'); eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call( const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'), path.join(this.runtime.pkgdir, 'run'),
@ -310,37 +304,28 @@ class Job {
this.state = job_states.EXECUTED; this.state = job_states.EXECUTED;
} }
cleanup_processes(dont_wait = []) { async cleanup_processes(dont_wait = []) {
let processes = [1]; let processes = [1];
const to_wait = []; logger.debug(`Cleaning up processes uuid=${this.uuid}`);
this.logger.debug(`Cleaning up processes`);
while (processes.length > 0) { while (processes.length > 0) {
processes = []; processes = [];
const proc_ids = fss.readdir_sync('/proc'); const proc_ids = await fs.readdir('/proc');
processes = proc_ids.map(proc_id => { processes = await Promise.all(
proc_ids.map(async proc_id => {
if (isNaN(proc_id)) return -1; if (isNaN(proc_id)) return -1;
try { try {
const proc_status = fss.read_file_sync( const proc_status = await fs.read_file(
path.join('/proc', proc_id, 'status') path.join('/proc', proc_id, 'status')
); );
const proc_lines = proc_status.to_string().split('\n'); const proc_lines = proc_status.to_string().split('\n');
const state_line = proc_lines.find(line =>
line.starts_with('State:')
);
const uid_line = proc_lines.find(line => const uid_line = proc_lines.find(line =>
line.starts_with('Uid:') line.starts_with('Uid:')
); );
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/); const [_, ruid, euid, suid, fuid] =
uid_line.split(/\s+/);
const [_1, state, user_friendly] = state_line.split(/\s+/);
if (state == 'Z')
// Zombie process, just needs to be waited
return -1;
// We should kill in all other state (Sleep, Stopped & Running)
if (ruid == this.uid || euid == this.uid) if (ruid == this.uid || euid == this.uid)
return parse_int(proc_id); return parse_int(proc_id);
@ -349,23 +334,22 @@ class Job {
} }
return -1; return -1;
}); })
);
processes = processes.filter(p => p > 0); processes = processes.filter(p => p > 0);
if (processes.length > 0) if (processes.length > 0)
this.logger.debug(`Got processes to kill: ${processes}`); logger.debug(
`Got processes to kill: ${processes} uuid=${this.uuid}`
);
for (const proc of processes) { for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork // First stop the processes, but keep their resources allocated so they cant re-fork
try { try {
process.kill(proc, 'SIGSTOP'); process.kill(proc, 'SIGSTOP');
} catch (e) { } catch {
// Could already be dead // Could already be dead
this.logger.debug(
`Got error while SIGSTOPping process ${proc}:`,
e
);
} }
} }
@ -375,27 +359,13 @@ class Job {
process.kill(proc, 'SIGKILL'); process.kill(proc, 'SIGKILL');
} catch { } catch {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
this.logger.debug(
`Got error while SIGKILLing process ${proc}:`,
e
);
} }
to_wait.push(proc); if (!dont_wait.includes(proc)) wait_pid(proc);
} }
} }
this.logger.debug( logger.debug(`Cleaned up processes uuid=${this.uuid}`);
`Finished kill-loop, calling wait_pid to end any zombie processes`
);
for (const proc of to_wait) {
if (dont_wait.includes(proc)) continue;
wait_pid(proc);
}
this.logger.debug(`Cleaned up processes`);
} }
async cleanup_filesystem() { async cleanup_filesystem() {
@ -416,7 +386,7 @@ class Job {
} }
} catch (e) { } catch (e) {
// File was somehow deleted in the time that we read the dir to when we checked the file // File was somehow deleted in the time that we read the dir to when we checked the file
this.logger.warn(`Error removing file ${file_path}: ${e}`); logger.warn(`Error removing file ${file_path}: ${e}`);
} }
} }
} }
@ -425,9 +395,8 @@ class Job {
} }
async cleanup() { async cleanup() {
this.logger.info(`Cleaning up job`); logger.info(`Cleaning up job uuid=${this.uuid}`);
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
await this.cleanup_filesystem(); await this.cleanup_filesystem();
remaining_job_spaces++; remaining_job_spaces++;