api(job): Decrease safe_call CPU time

By increasing the niceness value of child processes, the scheduler gives less CPU Time to them.
This allows the node process to get more CPU time, and can work through the event queue faster.
This commit is contained in:
Thomas Hobson 2021-11-11 21:34:30 +13:00
parent c7efa5372a
commit c091c117c7
No known key found for this signature in database
GPG Key ID: 9F1FD9D87950DB6F
1 changed files with 39 additions and 36 deletions

View File

@ -1,4 +1,5 @@
const logger = require('logplease').create('job');
const logplease = require('logplease');
const logger = logplease.create('job');
const { v4: uuidv4 } = require('uuid');
const cp = require('child_process');
const path = require('path');
@ -30,6 +31,9 @@ setInterval(() => {
class Job {
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4();
this.logger = logplease.create(`job/${this.uuid}`);
this.runtime = runtime;
this.files = files.map((file, i) => ({
name: file.name || `file${i}.code`,
@ -54,6 +58,8 @@ class Job {
uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1;
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
this.state = job_states.READY;
this.dir = path.join(
config.data_directory,
@ -64,17 +70,17 @@ class Job {
async prime() {
if (remaining_job_spaces < 1) {
logger.info(`Awaiting job slot uuid=${this.uuid}`);
this.logger.info(`Awaiting job slot`);
await new Promise(resolve => {
jobQueue.push(resolve);
});
}
logger.info(`Priming job uuid=${this.uuid}`);
this.logger.info(`Priming job`);
remaining_job_spaces--;
logger.debug('Writing files to job cache');
this.logger.debug('Writing files to job cache');
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
this.logger.debug(`Transfering ownership`);
await fs.mkdir(this.dir, { mode: 0o700 });
await fs.chown(this.dir, this.uid, this.gid);
@ -101,7 +107,7 @@ class Job {
this.state = job_states.PRIMED;
logger.debug('Primed job');
this.logger.debug('Primed job');
}
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
@ -119,7 +125,14 @@ class Job {
prlimit.push('--as=' + memory_limit);
}
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
const proc_call = [
'nice',
...prlimit,
...nonetwork,
'bash',
file,
...args,
];
var stdout = '';
var stderr = '';
@ -154,9 +167,7 @@ class Job {
const kill_timeout =
(timeout >= 0 &&
set_timeout(async _ => {
logger.info(
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
);
this.logger.info(`Timeout exceeded timeout=${timeout}`);
process.kill(proc.pid, 'SIGKILL');
}, timeout)) ||
null;
@ -165,7 +176,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stderr', data);
} else if (stderr.length > this.runtime.output_max_size) {
logger.info(`stderr length exceeded uuid=${this.uuid}`);
this.logger.info(`stderr length exceeded`);
process.kill(proc.pid, 'SIGKILL');
} else {
stderr += data;
@ -177,7 +188,7 @@ class Job {
if (eventBus !== null) {
eventBus.emit('stdout', data);
} else if (stdout.length > this.runtime.output_max_size) {
logger.info(`stdout length exceeded uuid=${this.uuid}`);
this.logger.info(`stdout length exceeded`);
process.kill(proc.pid, 'SIGKILL');
} else {
stdout += data;
@ -192,7 +203,7 @@ class Job {
proc.stdout.destroy();
this.cleanup_processes();
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
this.logger.debug(`Finished exit cleanup`);
};
proc.on('exit', (code, signal) => {
@ -217,17 +228,13 @@ class Job {
);
}
logger.info(
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
this.gid
} runtime=${this.runtime.toString()}`
);
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
logger.debug('Compiling');
this.logger.debug('Compiling');
let compile;
@ -240,7 +247,7 @@ class Job {
);
}
logger.debug('Running');
this.logger.debug('Running');
const run = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -267,10 +274,8 @@ class Job {
);
}
logger.info(
`Interactively executing job uuid=${this.uuid} uid=${
this.uid
} gid=${this.gid} runtime=${this.runtime.toString()}`
this.logger.info(
`Interactively executing job runtime=${this.runtime.toString()}`
);
const code_files =
@ -290,7 +295,7 @@ class Job {
eventBus.emit('exit', 'compile', { error, code, signal });
}
logger.debug('Running');
this.logger.debug('Running');
eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
@ -308,7 +313,7 @@ class Job {
cleanup_processes(dont_wait = []) {
let processes = [1];
const to_wait = [];
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
this.logger.debug(`Cleaning up processes`);
while (processes.length > 0) {
processes = [];
@ -349,9 +354,7 @@ class Job {
processes = processes.filter(p => p > 0);
if (processes.length > 0)
logger.debug(
`Got processes to kill: ${processes} uuid=${this.uuid}`
);
this.logger.debug(`Got processes to kill: ${processes}`);
for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork
@ -359,7 +362,7 @@ class Job {
process.kill(proc, 'SIGSTOP');
} catch (e) {
// Could already be dead
logger.debug(
this.logger.debug(
`Got error while SIGSTOPping process ${proc}:`,
e
);
@ -372,7 +375,7 @@ class Job {
process.kill(proc, 'SIGKILL');
} catch {
// Could already be dead and just needs to be waited on
logger.debug(
this.logger.debug(
`Got error while SIGKILLing process ${proc}:`,
e
);
@ -382,8 +385,8 @@ class Job {
}
}
logger.debug(
`Finished kill-loop, calling wait_pid to end any zombie processes uuid=${this.uuid}`
this.logger.debug(
`Finished kill-loop, calling wait_pid to end any zombie processes`
);
for (const proc of to_wait) {
@ -392,7 +395,7 @@ class Job {
wait_pid(proc);
}
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
this.logger.debug(`Cleaned up processes`);
}
async cleanup_filesystem() {
@ -413,7 +416,7 @@ class Job {
}
} catch (e) {
// File was somehow deleted in the time that we read the dir to when we checked the file
logger.warn(`Error removing file ${file_path}: ${e}`);
this.logger.warn(`Error removing file ${file_path}: ${e}`);
}
}
}
@ -422,7 +425,7 @@ class Job {
}
async cleanup() {
logger.info(`Cleaning up job uuid=${this.uuid}`);
this.logger.info(`Cleaning up job`);
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
await this.cleanup_filesystem();