Compare commits

...

2 Commits

Author SHA1 Message Date
Thomas Hobson c091c117c7
api(job): Decrease safe_call CPU time
By increasing the niceness value of child processes, the scheduler gives less CPU Time to them.
This allows the node process to get more CPU time, and can work through the event queue faster.
2021-11-11 21:34:30 +13:00
Thomas Hobson c7efa5372a
api(job): Switch process cleanup to sync
The system used to use async.
This would result in execution is handed off to other processes.
In the case of a forkbomb was used, it could circumvent the janitor as it consumed more CPU time which would prevent the process janitor from reading the process information in.
2021-11-11 19:27:54 +13:00
1 changed files with 92 additions and 61 deletions

View File

@ -1,10 +1,12 @@
const logger = require('logplease').create('job');
const logplease = require('logplease');
const logger = logplease.create('job');
const { v4: uuidv4 } = require('uuid');
const cp = require('child_process');
const path = require('path');
const config = require('./config');
const globals = require('./globals');
const fs = require('fs/promises');
const fss = require('fs');
const wait_pid = require('waitpid');
const job_states = {
@ -29,6 +31,9 @@ setInterval(() => {
class Job {
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4();
this.logger = logplease.create(`job/${this.uuid}`);
this.runtime = runtime;
this.files = files.map((file, i) => ({
name: file.name || `file${i}.code`,
@ -53,6 +58,8 @@ class Job {
uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1;
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
this.state = job_states.READY;
this.dir = path.join(
config.data_directory,
@ -63,17 +70,17 @@ class Job {
async prime() {
if (remaining_job_spaces < 1) {
logger.info(`Awaiting job slot uuid=${this.uuid}`);
this.logger.info(`Awaiting job slot`);
await new Promise(resolve => {
jobQueue.push(resolve);
});
}
logger.info(`Priming job uuid=${this.uuid}`);
this.logger.info(`Priming job`);
remaining_job_spaces--;
logger.debug('Writing files to job cache');
this.logger.debug('Writing files to job cache');
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
this.logger.debug(`Transfering ownership`);
await fs.mkdir(this.dir, { mode: 0o700 });
await fs.chown(this.dir, this.uid, this.gid);
@ -100,7 +107,7 @@ class Job {
this.state = job_states.PRIMED;
logger.debug('Primed job');
this.logger.debug('Primed job');
}
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
@ -118,7 +125,14 @@ class Job {
prlimit.push('--as=' + memory_limit);
}
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
const proc_call = [
'nice',
...prlimit,
...nonetwork,
'bash',
file,
...args,
];
var stdout = '';
var stderr = '';
@ -153,9 +167,7 @@ class Job {
const kill_timeout =
(timeout >= 0 &&
set_timeout(async _ => {
logger.info(
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
);
this.logger.info(`Timeout exceeded timeout=${timeout}`);
process.kill(proc.pid, 'SIGKILL');
}, timeout)) ||
null;
@ -164,7 +176,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stderr', data);
} else if (stderr.length > this.runtime.output_max_size) {
logger.info(`stderr length exceeded uuid=${this.uuid}`);
this.logger.info(`stderr length exceeded`);
process.kill(proc.pid, 'SIGKILL');
} else {
stderr += data;
@ -176,7 +188,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stdout', data);
} else if (stdout.length > this.runtime.output_max_size) {
logger.info(`stdout length exceeded uuid=${this.uuid}`);
this.logger.info(`stdout length exceeded`);
process.kill(proc.pid, 'SIGKILL');
} else {
stdout += data;
@ -184,24 +196,24 @@ class Job {
}
});
const exit_cleanup = async () => {
const exit_cleanup = () => {
clear_timeout(kill_timeout);
proc.stderr.destroy();
proc.stdout.destroy();
await this.cleanup_processes();
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`);
};
proc.on('exit', async (code, signal) => {
await exit_cleanup();
proc.on('exit', (code, signal) => {
exit_cleanup();
resolve({ stdout, stderr, code, signal, output });
});
proc.on('error', async err => {
await exit_cleanup();
proc.on('error', err => {
exit_cleanup();
reject({ error: err, stdout, stderr, output });
});
@ -216,17 +228,13 @@ class Job {
);
}
logger.info(
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
this.gid
} runtime=${this.runtime.toString()}`
);
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
logger.debug('Compiling');
this.logger.debug('Compiling');
let compile;
@ -239,7 +247,7 @@ class Job {
);
}
logger.debug('Running');
this.logger.debug('Running');
const run = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -266,10 +274,8 @@ class Job {
);
}
logger.info(
`Interactively executing job uuid=${this.uuid} uid=${
this.uid
} gid=${this.gid} runtime=${this.runtime.toString()}`
this.logger.info(
`Interactively executing job runtime=${this.runtime.toString()}`
);
const code_files =
@ -289,7 +295,7 @@ class Job {
eventBus.emit('exit', 'compile', { error, code, signal });
}
logger.debug('Running');
this.logger.debug('Running');
eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -304,52 +310,62 @@ class Job {
this.state = job_states.EXECUTED;
}
async cleanup_processes(dont_wait = []) {
cleanup_processes(dont_wait = []) {
let processes = [1];
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
const to_wait = [];
this.logger.debug(`Cleaning up processes`);
while (processes.length > 0) {
processes = [];
const proc_ids = await fs.readdir('/proc');
const proc_ids = fss.readdir_sync('/proc');
processes = await Promise.all(
proc_ids.map(async proc_id => {
if (isNaN(proc_id)) return -1;
try {
const proc_status = await fs.read_file(
path.join('/proc', proc_id, 'status')
);
const proc_lines = proc_status.to_string().split('\n');
const uid_line = proc_lines.find(line =>
line.starts_with('Uid:')
);
const [_, ruid, euid, suid, fuid] =
uid_line.split(/\s+/);
processes = proc_ids.map(proc_id => {
if (isNaN(proc_id)) return -1;
try {
const proc_status = fss.read_file_sync(
path.join('/proc', proc_id, 'status')
);
const proc_lines = proc_status.to_string().split('\n');
const state_line = proc_lines.find(line =>
line.starts_with('State:')
);
const uid_line = proc_lines.find(line =>
line.starts_with('Uid:')
);
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
if (ruid == this.uid || euid == this.uid)
return parse_int(proc_id);
} catch {
const [_1, state, user_friendly] = state_line.split(/\s+/);
if (state == 'Z')
// Zombie process, just needs to be waited
return -1;
}
// We should kill in all other state (Sleep, Stopped & Running)
if (ruid == this.uid || euid == this.uid)
return parse_int(proc_id);
} catch {
return -1;
})
);
}
return -1;
});
processes = processes.filter(p => p > 0);
if (processes.length > 0)
logger.debug(
`Got processes to kill: ${processes} uuid=${this.uuid}`
);
this.logger.debug(`Got processes to kill: ${processes}`);
for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork
try {
process.kill(proc, 'SIGSTOP');
} catch {
} catch (e) {
// Could already be dead
this.logger.debug(
`Got error while SIGSTOPping process ${proc}:`,
e
);
}
}
@ -359,13 +375,27 @@ class Job {
process.kill(proc, 'SIGKILL');
} catch {
// Could already be dead and just needs to be waited on
this.logger.debug(
`Got error while SIGKILLing process ${proc}:`,
e
);
}
if (!dont_wait.includes(proc)) wait_pid(proc);
to_wait.push(proc);
}
}
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
this.logger.debug(
`Finished kill-loop, calling wait_pid to end any zombie processes`
);
for (const proc of to_wait) {
if (dont_wait.includes(proc)) continue;
wait_pid(proc);
}
this.logger.debug(`Cleaned up processes`);
}
async cleanup_filesystem() {
@ -386,7 +416,7 @@ class Job {
}
} catch (e) {
// File was somehow deleted in the time that we read the dir to when we checked the file
logger.warn(`Error removing file ${file_path}: ${e}`);
this.logger.warn(`Error removing file ${file_path}: ${e}`);
}
}
}
@ -395,8 +425,9 @@ class Job {
}
async cleanup() {
logger.info(`Cleaning up job uuid=${this.uuid}`);
this.logger.info(`Cleaning up job`);
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
await this.cleanup_filesystem();
remaining_job_spaces++;