Compare commits
2 Commits
d8b430654b
...
c091c117c7
Author | SHA1 | Date |
---|---|---|
Thomas Hobson | c091c117c7 | |
Thomas Hobson | c7efa5372a |
129
api/src/job.js
129
api/src/job.js
|
@ -1,10 +1,12 @@
|
||||||
const logger = require('logplease').create('job');
|
const logplease = require('logplease');
|
||||||
|
const logger = logplease.create('job');
|
||||||
const { v4: uuidv4 } = require('uuid');
|
const { v4: uuidv4 } = require('uuid');
|
||||||
const cp = require('child_process');
|
const cp = require('child_process');
|
||||||
const path = require('path');
|
const path = require('path');
|
||||||
const config = require('./config');
|
const config = require('./config');
|
||||||
const globals = require('./globals');
|
const globals = require('./globals');
|
||||||
const fs = require('fs/promises');
|
const fs = require('fs/promises');
|
||||||
|
const fss = require('fs');
|
||||||
const wait_pid = require('waitpid');
|
const wait_pid = require('waitpid');
|
||||||
|
|
||||||
const job_states = {
|
const job_states = {
|
||||||
|
@ -29,6 +31,9 @@ setInterval(() => {
|
||||||
class Job {
|
class Job {
|
||||||
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
||||||
this.uuid = uuidv4();
|
this.uuid = uuidv4();
|
||||||
|
|
||||||
|
this.logger = logplease.create(`job/${this.uuid}`);
|
||||||
|
|
||||||
this.runtime = runtime;
|
this.runtime = runtime;
|
||||||
this.files = files.map((file, i) => ({
|
this.files = files.map((file, i) => ({
|
||||||
name: file.name || `file${i}.code`,
|
name: file.name || `file${i}.code`,
|
||||||
|
@ -53,6 +58,8 @@ class Job {
|
||||||
uid %= config.runner_uid_max - config.runner_uid_min + 1;
|
uid %= config.runner_uid_max - config.runner_uid_min + 1;
|
||||||
gid %= config.runner_gid_max - config.runner_gid_min + 1;
|
gid %= config.runner_gid_max - config.runner_gid_min + 1;
|
||||||
|
|
||||||
|
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
|
||||||
|
|
||||||
this.state = job_states.READY;
|
this.state = job_states.READY;
|
||||||
this.dir = path.join(
|
this.dir = path.join(
|
||||||
config.data_directory,
|
config.data_directory,
|
||||||
|
@ -63,17 +70,17 @@ class Job {
|
||||||
|
|
||||||
async prime() {
|
async prime() {
|
||||||
if (remaining_job_spaces < 1) {
|
if (remaining_job_spaces < 1) {
|
||||||
logger.info(`Awaiting job slot uuid=${this.uuid}`);
|
this.logger.info(`Awaiting job slot`);
|
||||||
await new Promise(resolve => {
|
await new Promise(resolve => {
|
||||||
jobQueue.push(resolve);
|
jobQueue.push(resolve);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Priming job uuid=${this.uuid}`);
|
this.logger.info(`Priming job`);
|
||||||
remaining_job_spaces--;
|
remaining_job_spaces--;
|
||||||
logger.debug('Writing files to job cache');
|
this.logger.debug('Writing files to job cache');
|
||||||
|
|
||||||
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
|
this.logger.debug(`Transfering ownership`);
|
||||||
|
|
||||||
await fs.mkdir(this.dir, { mode: 0o700 });
|
await fs.mkdir(this.dir, { mode: 0o700 });
|
||||||
await fs.chown(this.dir, this.uid, this.gid);
|
await fs.chown(this.dir, this.uid, this.gid);
|
||||||
|
@ -100,7 +107,7 @@ class Job {
|
||||||
|
|
||||||
this.state = job_states.PRIMED;
|
this.state = job_states.PRIMED;
|
||||||
|
|
||||||
logger.debug('Primed job');
|
this.logger.debug('Primed job');
|
||||||
}
|
}
|
||||||
|
|
||||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
||||||
|
@ -118,7 +125,14 @@ class Job {
|
||||||
prlimit.push('--as=' + memory_limit);
|
prlimit.push('--as=' + memory_limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
|
const proc_call = [
|
||||||
|
'nice',
|
||||||
|
...prlimit,
|
||||||
|
...nonetwork,
|
||||||
|
'bash',
|
||||||
|
file,
|
||||||
|
...args,
|
||||||
|
];
|
||||||
|
|
||||||
var stdout = '';
|
var stdout = '';
|
||||||
var stderr = '';
|
var stderr = '';
|
||||||
|
@ -153,9 +167,7 @@ class Job {
|
||||||
const kill_timeout =
|
const kill_timeout =
|
||||||
(timeout >= 0 &&
|
(timeout >= 0 &&
|
||||||
set_timeout(async _ => {
|
set_timeout(async _ => {
|
||||||
logger.info(
|
this.logger.info(`Timeout exceeded timeout=${timeout}`);
|
||||||
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
|
|
||||||
);
|
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
}, timeout)) ||
|
}, timeout)) ||
|
||||||
null;
|
null;
|
||||||
|
@ -164,7 +176,7 @@ class Job {
|
||||||
if (eventBus !== null) {
|
if (eventBus !== null) {
|
||||||
eventBus.emit('stderr', data);
|
eventBus.emit('stderr', data);
|
||||||
} else if (stderr.length > this.runtime.output_max_size) {
|
} else if (stderr.length > this.runtime.output_max_size) {
|
||||||
logger.info(`stderr length exceeded uuid=${this.uuid}`);
|
this.logger.info(`stderr length exceeded`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
} else {
|
} else {
|
||||||
stderr += data;
|
stderr += data;
|
||||||
|
@ -176,7 +188,7 @@ class Job {
|
||||||
if (eventBus !== null) {
|
if (eventBus !== null) {
|
||||||
eventBus.emit('stdout', data);
|
eventBus.emit('stdout', data);
|
||||||
} else if (stdout.length > this.runtime.output_max_size) {
|
} else if (stdout.length > this.runtime.output_max_size) {
|
||||||
logger.info(`stdout length exceeded uuid=${this.uuid}`);
|
this.logger.info(`stdout length exceeded`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
} else {
|
} else {
|
||||||
stdout += data;
|
stdout += data;
|
||||||
|
@ -184,24 +196,24 @@ class Job {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const exit_cleanup = async () => {
|
const exit_cleanup = () => {
|
||||||
clear_timeout(kill_timeout);
|
clear_timeout(kill_timeout);
|
||||||
|
|
||||||
proc.stderr.destroy();
|
proc.stderr.destroy();
|
||||||
proc.stdout.destroy();
|
proc.stdout.destroy();
|
||||||
|
|
||||||
await this.cleanup_processes();
|
this.cleanup_processes();
|
||||||
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
|
this.logger.debug(`Finished exit cleanup`);
|
||||||
};
|
};
|
||||||
|
|
||||||
proc.on('exit', async (code, signal) => {
|
proc.on('exit', (code, signal) => {
|
||||||
await exit_cleanup();
|
exit_cleanup();
|
||||||
|
|
||||||
resolve({ stdout, stderr, code, signal, output });
|
resolve({ stdout, stderr, code, signal, output });
|
||||||
});
|
});
|
||||||
|
|
||||||
proc.on('error', async err => {
|
proc.on('error', err => {
|
||||||
await exit_cleanup();
|
exit_cleanup();
|
||||||
|
|
||||||
reject({ error: err, stdout, stderr, output });
|
reject({ error: err, stdout, stderr, output });
|
||||||
});
|
});
|
||||||
|
@ -216,17 +228,13 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(
|
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
|
||||||
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
|
|
||||||
this.gid
|
|
||||||
} runtime=${this.runtime.toString()}`
|
|
||||||
);
|
|
||||||
|
|
||||||
const code_files =
|
const code_files =
|
||||||
(this.runtime.language === 'file' && this.files) ||
|
(this.runtime.language === 'file' && this.files) ||
|
||||||
this.files.filter(file => file.encoding == 'utf8');
|
this.files.filter(file => file.encoding == 'utf8');
|
||||||
|
|
||||||
logger.debug('Compiling');
|
this.logger.debug('Compiling');
|
||||||
|
|
||||||
let compile;
|
let compile;
|
||||||
|
|
||||||
|
@ -239,7 +247,7 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug('Running');
|
this.logger.debug('Running');
|
||||||
|
|
||||||
const run = await this.safe_call(
|
const run = await this.safe_call(
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
path.join(this.runtime.pkgdir, 'run'),
|
||||||
|
@ -266,10 +274,8 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(
|
this.logger.info(
|
||||||
`Interactively executing job uuid=${this.uuid} uid=${
|
`Interactively executing job runtime=${this.runtime.toString()}`
|
||||||
this.uid
|
|
||||||
} gid=${this.gid} runtime=${this.runtime.toString()}`
|
|
||||||
);
|
);
|
||||||
|
|
||||||
const code_files =
|
const code_files =
|
||||||
|
@ -289,7 +295,7 @@ class Job {
|
||||||
eventBus.emit('exit', 'compile', { error, code, signal });
|
eventBus.emit('exit', 'compile', { error, code, signal });
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug('Running');
|
this.logger.debug('Running');
|
||||||
eventBus.emit('stage', 'run');
|
eventBus.emit('stage', 'run');
|
||||||
const { error, code, signal } = await this.safe_call(
|
const { error, code, signal } = await this.safe_call(
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
path.join(this.runtime.pkgdir, 'run'),
|
||||||
|
@ -304,28 +310,37 @@ class Job {
|
||||||
this.state = job_states.EXECUTED;
|
this.state = job_states.EXECUTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
async cleanup_processes(dont_wait = []) {
|
cleanup_processes(dont_wait = []) {
|
||||||
let processes = [1];
|
let processes = [1];
|
||||||
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
|
const to_wait = [];
|
||||||
|
this.logger.debug(`Cleaning up processes`);
|
||||||
|
|
||||||
while (processes.length > 0) {
|
while (processes.length > 0) {
|
||||||
processes = [];
|
processes = [];
|
||||||
|
|
||||||
const proc_ids = await fs.readdir('/proc');
|
const proc_ids = fss.readdir_sync('/proc');
|
||||||
|
|
||||||
processes = await Promise.all(
|
processes = proc_ids.map(proc_id => {
|
||||||
proc_ids.map(async proc_id => {
|
|
||||||
if (isNaN(proc_id)) return -1;
|
if (isNaN(proc_id)) return -1;
|
||||||
try {
|
try {
|
||||||
const proc_status = await fs.read_file(
|
const proc_status = fss.read_file_sync(
|
||||||
path.join('/proc', proc_id, 'status')
|
path.join('/proc', proc_id, 'status')
|
||||||
);
|
);
|
||||||
const proc_lines = proc_status.to_string().split('\n');
|
const proc_lines = proc_status.to_string().split('\n');
|
||||||
|
const state_line = proc_lines.find(line =>
|
||||||
|
line.starts_with('State:')
|
||||||
|
);
|
||||||
const uid_line = proc_lines.find(line =>
|
const uid_line = proc_lines.find(line =>
|
||||||
line.starts_with('Uid:')
|
line.starts_with('Uid:')
|
||||||
);
|
);
|
||||||
const [_, ruid, euid, suid, fuid] =
|
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
|
||||||
uid_line.split(/\s+/);
|
|
||||||
|
const [_1, state, user_friendly] = state_line.split(/\s+/);
|
||||||
|
|
||||||
|
if (state == 'Z')
|
||||||
|
// Zombie process, just needs to be waited
|
||||||
|
return -1;
|
||||||
|
// We should kill in all other state (Sleep, Stopped & Running)
|
||||||
|
|
||||||
if (ruid == this.uid || euid == this.uid)
|
if (ruid == this.uid || euid == this.uid)
|
||||||
return parse_int(proc_id);
|
return parse_int(proc_id);
|
||||||
|
@ -334,22 +349,23 @@ class Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
})
|
});
|
||||||
);
|
|
||||||
|
|
||||||
processes = processes.filter(p => p > 0);
|
processes = processes.filter(p => p > 0);
|
||||||
|
|
||||||
if (processes.length > 0)
|
if (processes.length > 0)
|
||||||
logger.debug(
|
this.logger.debug(`Got processes to kill: ${processes}`);
|
||||||
`Got processes to kill: ${processes} uuid=${this.uuid}`
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const proc of processes) {
|
for (const proc of processes) {
|
||||||
// First stop the processes, but keep their resources allocated so they cant re-fork
|
// First stop the processes, but keep their resources allocated so they cant re-fork
|
||||||
try {
|
try {
|
||||||
process.kill(proc, 'SIGSTOP');
|
process.kill(proc, 'SIGSTOP');
|
||||||
} catch {
|
} catch (e) {
|
||||||
// Could already be dead
|
// Could already be dead
|
||||||
|
this.logger.debug(
|
||||||
|
`Got error while SIGSTOPping process ${proc}:`,
|
||||||
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,13 +375,27 @@ class Job {
|
||||||
process.kill(proc, 'SIGKILL');
|
process.kill(proc, 'SIGKILL');
|
||||||
} catch {
|
} catch {
|
||||||
// Could already be dead and just needs to be waited on
|
// Could already be dead and just needs to be waited on
|
||||||
|
this.logger.debug(
|
||||||
|
`Got error while SIGKILLing process ${proc}:`,
|
||||||
|
e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!dont_wait.includes(proc)) wait_pid(proc);
|
to_wait.push(proc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
|
this.logger.debug(
|
||||||
|
`Finished kill-loop, calling wait_pid to end any zombie processes`
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const proc of to_wait) {
|
||||||
|
if (dont_wait.includes(proc)) continue;
|
||||||
|
|
||||||
|
wait_pid(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.debug(`Cleaned up processes`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async cleanup_filesystem() {
|
async cleanup_filesystem() {
|
||||||
|
@ -386,7 +416,7 @@ class Job {
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// File was somehow deleted in the time that we read the dir to when we checked the file
|
// File was somehow deleted in the time that we read the dir to when we checked the file
|
||||||
logger.warn(`Error removing file ${file_path}: ${e}`);
|
this.logger.warn(`Error removing file ${file_path}: ${e}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -395,8 +425,9 @@ class Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
async cleanup() {
|
async cleanup() {
|
||||||
logger.info(`Cleaning up job uuid=${this.uuid}`);
|
this.logger.info(`Cleaning up job`);
|
||||||
|
|
||||||
|
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
|
||||||
await this.cleanup_filesystem();
|
await this.cleanup_filesystem();
|
||||||
|
|
||||||
remaining_job_spaces++;
|
remaining_job_spaces++;
|
||||||
|
|
Loading…
Reference in New Issue