Compare commits

..

No commits in common. "c091c117c7c1a5165c137e3d367adc8eea2662f4" and "d8b430654bc5da463d178311345d08c870a7b9e4" have entirely different histories.

1 changed files with 61 additions and 92 deletions

View File

@ -1,12 +1,10 @@
const logplease = require('logplease');
const logger = logplease.create('job');
const logger = require('logplease').create('job');
const { v4: uuidv4 } = require('uuid');
const cp = require('child_process');
const path = require('path');
const config = require('./config');
const globals = require('./globals');
const fs = require('fs/promises');
const fss = require('fs');
const wait_pid = require('waitpid');
const job_states = {
@ -31,9 +29,6 @@ setInterval(() => {
class Job {
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4();
this.logger = logplease.create(`job/${this.uuid}`);
this.runtime = runtime;
this.files = files.map((file, i) => ({
name: file.name || `file${i}.code`,
@ -58,8 +53,6 @@ class Job {
uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1;
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
this.state = job_states.READY;
this.dir = path.join(
config.data_directory,
@ -70,17 +63,17 @@ class Job {
async prime() {
if (remaining_job_spaces < 1) {
this.logger.info(`Awaiting job slot`);
logger.info(`Awaiting job slot uuid=${this.uuid}`);
await new Promise(resolve => {
jobQueue.push(resolve);
});
}
this.logger.info(`Priming job`);
logger.info(`Priming job uuid=${this.uuid}`);
remaining_job_spaces--;
this.logger.debug('Writing files to job cache');
logger.debug('Writing files to job cache');
this.logger.debug(`Transfering ownership`);
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
await fs.mkdir(this.dir, { mode: 0o700 });
await fs.chown(this.dir, this.uid, this.gid);
@ -107,7 +100,7 @@ class Job {
this.state = job_states.PRIMED;
this.logger.debug('Primed job');
logger.debug('Primed job');
}
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
@ -125,14 +118,7 @@ class Job {
prlimit.push('--as=' + memory_limit);
}
const proc_call = [
'nice',
...prlimit,
...nonetwork,
'bash',
file,
...args,
];
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
var stdout = '';
var stderr = '';
@ -167,7 +153,9 @@ class Job {
const kill_timeout =
(timeout >= 0 &&
set_timeout(async _ => {
this.logger.info(`Timeout exceeded timeout=${timeout}`);
logger.info(
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
);
process.kill(proc.pid, 'SIGKILL');
}, timeout)) ||
null;
@ -176,7 +164,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stderr', data);
} else if (stderr.length > this.runtime.output_max_size) {
this.logger.info(`stderr length exceeded`);
logger.info(`stderr length exceeded uuid=${this.uuid}`);
process.kill(proc.pid, 'SIGKILL');
} else {
stderr += data;
@ -188,7 +176,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stdout', data);
} else if (stdout.length > this.runtime.output_max_size) {
this.logger.info(`stdout length exceeded`);
logger.info(`stdout length exceeded uuid=${this.uuid}`);
process.kill(proc.pid, 'SIGKILL');
} else {
stdout += data;
@ -196,24 +184,24 @@ class Job {
}
});
const exit_cleanup = () => {
const exit_cleanup = async () => {
clear_timeout(kill_timeout);
proc.stderr.destroy();
proc.stdout.destroy();
this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`);
await this.cleanup_processes();
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
};
proc.on('exit', (code, signal) => {
exit_cleanup();
proc.on('exit', async (code, signal) => {
await exit_cleanup();
resolve({ stdout, stderr, code, signal, output });
});
proc.on('error', err => {
exit_cleanup();
proc.on('error', async err => {
await exit_cleanup();
reject({ error: err, stdout, stderr, output });
});
@ -228,13 +216,17 @@ class Job {
);
}
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
logger.info(
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
this.gid
} runtime=${this.runtime.toString()}`
);
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
this.logger.debug('Compiling');
logger.debug('Compiling');
let compile;
@ -247,7 +239,7 @@ class Job {
);
}
this.logger.debug('Running');
logger.debug('Running');
const run = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -274,8 +266,10 @@ class Job {
);
}
this.logger.info(
`Interactively executing job runtime=${this.runtime.toString()}`
logger.info(
`Interactively executing job uuid=${this.uuid} uid=${
this.uid
} gid=${this.gid} runtime=${this.runtime.toString()}`
);
const code_files =
@ -295,7 +289,7 @@ class Job {
eventBus.emit('exit', 'compile', { error, code, signal });
}
this.logger.debug('Running');
logger.debug('Running');
eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -310,62 +304,52 @@ class Job {
this.state = job_states.EXECUTED;
}
cleanup_processes(dont_wait = []) {
async cleanup_processes(dont_wait = []) {
let processes = [1];
const to_wait = [];
this.logger.debug(`Cleaning up processes`);
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
while (processes.length > 0) {
processes = [];
const proc_ids = fss.readdir_sync('/proc');
const proc_ids = await fs.readdir('/proc');
processes = proc_ids.map(proc_id => {
if (isNaN(proc_id)) return -1;
try {
const proc_status = fss.read_file_sync(
path.join('/proc', proc_id, 'status')
);
const proc_lines = proc_status.to_string().split('\n');
const state_line = proc_lines.find(line =>
line.starts_with('State:')
);
const uid_line = proc_lines.find(line =>
line.starts_with('Uid:')
);
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
processes = await Promise.all(
proc_ids.map(async proc_id => {
if (isNaN(proc_id)) return -1;
try {
const proc_status = await fs.read_file(
path.join('/proc', proc_id, 'status')
);
const proc_lines = proc_status.to_string().split('\n');
const uid_line = proc_lines.find(line =>
line.starts_with('Uid:')
);
const [_, ruid, euid, suid, fuid] =
uid_line.split(/\s+/);
const [_1, state, user_friendly] = state_line.split(/\s+/);
if (state == 'Z')
// Zombie process, just needs to be waited
if (ruid == this.uid || euid == this.uid)
return parse_int(proc_id);
} catch {
return -1;
// We should kill in all other state (Sleep, Stopped & Running)
}
if (ruid == this.uid || euid == this.uid)
return parse_int(proc_id);
} catch {
return -1;
}
return -1;
});
})
);
processes = processes.filter(p => p > 0);
if (processes.length > 0)
this.logger.debug(`Got processes to kill: ${processes}`);
logger.debug(
`Got processes to kill: ${processes} uuid=${this.uuid}`
);
for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork
try {
process.kill(proc, 'SIGSTOP');
} catch (e) {
} catch {
// Could already be dead
this.logger.debug(
`Got error while SIGSTOPping process ${proc}:`,
e
);
}
}
@ -375,27 +359,13 @@ class Job {
process.kill(proc, 'SIGKILL');
} catch {
// Could already be dead and just needs to be waited on
this.logger.debug(
`Got error while SIGKILLing process ${proc}:`,
e
);
}
to_wait.push(proc);
if (!dont_wait.includes(proc)) wait_pid(proc);
}
}
this.logger.debug(
`Finished kill-loop, calling wait_pid to end any zombie processes`
);
for (const proc of to_wait) {
if (dont_wait.includes(proc)) continue;
wait_pid(proc);
}
this.logger.debug(`Cleaned up processes`);
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
}
async cleanup_filesystem() {
@ -416,7 +386,7 @@ class Job {
}
} catch (e) {
// File was somehow deleted in the time that we read the dir to when we checked the file
this.logger.warn(`Error removing file ${file_path}: ${e}`);
logger.warn(`Error removing file ${file_path}: ${e}`);
}
}
}
@ -425,9 +395,8 @@ class Job {
}
async cleanup() {
this.logger.info(`Cleaning up job`);
logger.info(`Cleaning up job uuid=${this.uuid}`);
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
await this.cleanup_filesystem();
remaining_job_spaces++;