Compare commits
No commits in common. "c091c117c7c1a5165c137e3d367adc8eea2662f4" and "d8b430654bc5da463d178311345d08c870a7b9e4" have entirely different histories.
c091c117c7
...
d8b430654b
129
api/src/job.js
129
api/src/job.js
|
@ -1,12 +1,10 @@
|
||||||
const logplease = require('logplease');
|
const logger = require('logplease').create('job');
|
||||||
const logger = logplease.create('job');
|
|
||||||
const { v4: uuidv4 } = require('uuid');
|
const { v4: uuidv4 } = require('uuid');
|
||||||
const cp = require('child_process');
|
const cp = require('child_process');
|
||||||
const path = require('path');
|
const path = require('path');
|
||||||
const config = require('./config');
|
const config = require('./config');
|
||||||
const globals = require('./globals');
|
const globals = require('./globals');
|
||||||
const fs = require('fs/promises');
|
const fs = require('fs/promises');
|
||||||
const fss = require('fs');
|
|
||||||
const wait_pid = require('waitpid');
|
const wait_pid = require('waitpid');
|
||||||
|
|
||||||
const job_states = {
|
const job_states = {
|
||||||
|
@ -31,9 +29,6 @@ setInterval(() => {
|
||||||
class Job {
|
class Job {
|
||||||
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
||||||
this.uuid = uuidv4();
|
this.uuid = uuidv4();
|
||||||
|
|
||||||
this.logger = logplease.create(`job/${this.uuid}`);
|
|
||||||
|
|
||||||
this.runtime = runtime;
|
this.runtime = runtime;
|
||||||
this.files = files.map((file, i) => ({
|
this.files = files.map((file, i) => ({
|
||||||
name: file.name || `file${i}.code`,
|
name: file.name || `file${i}.code`,
|
||||||
|
@ -58,8 +53,6 @@ class Job {
|
||||||
uid %= config.runner_uid_max - config.runner_uid_min + 1;
|
uid %= config.runner_uid_max - config.runner_uid_min + 1;
|
||||||
gid %= config.runner_gid_max - config.runner_gid_min + 1;
|
gid %= config.runner_gid_max - config.runner_gid_min + 1;
|
||||||
|
|
||||||
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
|
|
||||||
|
|
||||||
this.state = job_states.READY;
|
this.state = job_states.READY;
|
||||||
this.dir = path.join(
|
this.dir = path.join(
|
||||||
config.data_directory,
|
config.data_directory,
|
||||||
|
@ -70,17 +63,17 @@ class Job {
|
||||||
|
|
||||||
async prime() {
|
async prime() {
|
||||||
if (remaining_job_spaces < 1) {
|
if (remaining_job_spaces < 1) {
|
||||||
this.logger.info(`Awaiting job slot`);
|
logger.info(`Awaiting job slot uuid=${this.uuid}`);
|
||||||
await new Promise(resolve => {
|
await new Promise(resolve => {
|
||||||
jobQueue.push(resolve);
|
jobQueue.push(resolve);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Priming job`);
|
logger.info(`Priming job uuid=${this.uuid}`);
|
||||||
remaining_job_spaces--;
|
remaining_job_spaces--;
|
||||||
this.logger.debug('Writing files to job cache');
|
logger.debug('Writing files to job cache');
|
||||||
|
|
||||||
this.logger.debug(`Transfering ownership`);
|
logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`);
|
||||||
|
|
||||||
await fs.mkdir(this.dir, { mode: 0o700 });
|
await fs.mkdir(this.dir, { mode: 0o700 });
|
||||||
await fs.chown(this.dir, this.uid, this.gid);
|
await fs.chown(this.dir, this.uid, this.gid);
|
||||||
|
@ -107,7 +100,7 @@ class Job {
|
||||||
|
|
||||||
this.state = job_states.PRIMED;
|
this.state = job_states.PRIMED;
|
||||||
|
|
||||||
this.logger.debug('Primed job');
|
logger.debug('Primed job');
|
||||||
}
|
}
|
||||||
|
|
||||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
||||||
|
@ -125,14 +118,7 @@ class Job {
|
||||||
prlimit.push('--as=' + memory_limit);
|
prlimit.push('--as=' + memory_limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
const proc_call = [
|
const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args];
|
||||||
'nice',
|
|
||||||
...prlimit,
|
|
||||||
...nonetwork,
|
|
||||||
'bash',
|
|
||||||
file,
|
|
||||||
...args,
|
|
||||||
];
|
|
||||||
|
|
||||||
var stdout = '';
|
var stdout = '';
|
||||||
var stderr = '';
|
var stderr = '';
|
||||||
|
@ -167,7 +153,9 @@ class Job {
|
||||||
const kill_timeout =
|
const kill_timeout =
|
||||||
(timeout >= 0 &&
|
(timeout >= 0 &&
|
||||||
set_timeout(async _ => {
|
set_timeout(async _ => {
|
||||||
this.logger.info(`Timeout exceeded timeout=${timeout}`);
|
logger.info(
|
||||||
|
`Timeout exceeded timeout=${timeout} uuid=${this.uuid}`
|
||||||
|
);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
}, timeout)) ||
|
}, timeout)) ||
|
||||||
null;
|
null;
|
||||||
|
@ -176,7 +164,7 @@ class Job {
|
||||||
if (eventBus !== null) {
|
if (eventBus !== null) {
|
||||||
eventBus.emit('stderr', data);
|
eventBus.emit('stderr', data);
|
||||||
} else if (stderr.length > this.runtime.output_max_size) {
|
} else if (stderr.length > this.runtime.output_max_size) {
|
||||||
this.logger.info(`stderr length exceeded`);
|
logger.info(`stderr length exceeded uuid=${this.uuid}`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
} else {
|
} else {
|
||||||
stderr += data;
|
stderr += data;
|
||||||
|
@ -188,7 +176,7 @@ class Job {
|
||||||
if (eventBus !== null) {
|
if (eventBus !== null) {
|
||||||
eventBus.emit('stdout', data);
|
eventBus.emit('stdout', data);
|
||||||
} else if (stdout.length > this.runtime.output_max_size) {
|
} else if (stdout.length > this.runtime.output_max_size) {
|
||||||
this.logger.info(`stdout length exceeded`);
|
logger.info(`stdout length exceeded uuid=${this.uuid}`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
} else {
|
} else {
|
||||||
stdout += data;
|
stdout += data;
|
||||||
|
@ -196,24 +184,24 @@ class Job {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const exit_cleanup = () => {
|
const exit_cleanup = async () => {
|
||||||
clear_timeout(kill_timeout);
|
clear_timeout(kill_timeout);
|
||||||
|
|
||||||
proc.stderr.destroy();
|
proc.stderr.destroy();
|
||||||
proc.stdout.destroy();
|
proc.stdout.destroy();
|
||||||
|
|
||||||
this.cleanup_processes();
|
await this.cleanup_processes();
|
||||||
this.logger.debug(`Finished exit cleanup`);
|
logger.debug(`Finished exit cleanup uuid=${this.uuid}`);
|
||||||
};
|
};
|
||||||
|
|
||||||
proc.on('exit', (code, signal) => {
|
proc.on('exit', async (code, signal) => {
|
||||||
exit_cleanup();
|
await exit_cleanup();
|
||||||
|
|
||||||
resolve({ stdout, stderr, code, signal, output });
|
resolve({ stdout, stderr, code, signal, output });
|
||||||
});
|
});
|
||||||
|
|
||||||
proc.on('error', err => {
|
proc.on('error', async err => {
|
||||||
exit_cleanup();
|
await exit_cleanup();
|
||||||
|
|
||||||
reject({ error: err, stdout, stderr, output });
|
reject({ error: err, stdout, stderr, output });
|
||||||
});
|
});
|
||||||
|
@ -228,13 +216,17 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
|
logger.info(
|
||||||
|
`Executing job uuid=${this.uuid} uid=${this.uid} gid=${
|
||||||
|
this.gid
|
||||||
|
} runtime=${this.runtime.toString()}`
|
||||||
|
);
|
||||||
|
|
||||||
const code_files =
|
const code_files =
|
||||||
(this.runtime.language === 'file' && this.files) ||
|
(this.runtime.language === 'file' && this.files) ||
|
||||||
this.files.filter(file => file.encoding == 'utf8');
|
this.files.filter(file => file.encoding == 'utf8');
|
||||||
|
|
||||||
this.logger.debug('Compiling');
|
logger.debug('Compiling');
|
||||||
|
|
||||||
let compile;
|
let compile;
|
||||||
|
|
||||||
|
@ -247,7 +239,7 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug('Running');
|
logger.debug('Running');
|
||||||
|
|
||||||
const run = await this.safe_call(
|
const run = await this.safe_call(
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
path.join(this.runtime.pkgdir, 'run'),
|
||||||
|
@ -274,8 +266,10 @@ class Job {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(
|
logger.info(
|
||||||
`Interactively executing job runtime=${this.runtime.toString()}`
|
`Interactively executing job uuid=${this.uuid} uid=${
|
||||||
|
this.uid
|
||||||
|
} gid=${this.gid} runtime=${this.runtime.toString()}`
|
||||||
);
|
);
|
||||||
|
|
||||||
const code_files =
|
const code_files =
|
||||||
|
@ -295,7 +289,7 @@ class Job {
|
||||||
eventBus.emit('exit', 'compile', { error, code, signal });
|
eventBus.emit('exit', 'compile', { error, code, signal });
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug('Running');
|
logger.debug('Running');
|
||||||
eventBus.emit('stage', 'run');
|
eventBus.emit('stage', 'run');
|
||||||
const { error, code, signal } = await this.safe_call(
|
const { error, code, signal } = await this.safe_call(
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
path.join(this.runtime.pkgdir, 'run'),
|
||||||
|
@ -310,37 +304,28 @@ class Job {
|
||||||
this.state = job_states.EXECUTED;
|
this.state = job_states.EXECUTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup_processes(dont_wait = []) {
|
async cleanup_processes(dont_wait = []) {
|
||||||
let processes = [1];
|
let processes = [1];
|
||||||
const to_wait = [];
|
logger.debug(`Cleaning up processes uuid=${this.uuid}`);
|
||||||
this.logger.debug(`Cleaning up processes`);
|
|
||||||
|
|
||||||
while (processes.length > 0) {
|
while (processes.length > 0) {
|
||||||
processes = [];
|
processes = [];
|
||||||
|
|
||||||
const proc_ids = fss.readdir_sync('/proc');
|
const proc_ids = await fs.readdir('/proc');
|
||||||
|
|
||||||
processes = proc_ids.map(proc_id => {
|
processes = await Promise.all(
|
||||||
|
proc_ids.map(async proc_id => {
|
||||||
if (isNaN(proc_id)) return -1;
|
if (isNaN(proc_id)) return -1;
|
||||||
try {
|
try {
|
||||||
const proc_status = fss.read_file_sync(
|
const proc_status = await fs.read_file(
|
||||||
path.join('/proc', proc_id, 'status')
|
path.join('/proc', proc_id, 'status')
|
||||||
);
|
);
|
||||||
const proc_lines = proc_status.to_string().split('\n');
|
const proc_lines = proc_status.to_string().split('\n');
|
||||||
const state_line = proc_lines.find(line =>
|
|
||||||
line.starts_with('State:')
|
|
||||||
);
|
|
||||||
const uid_line = proc_lines.find(line =>
|
const uid_line = proc_lines.find(line =>
|
||||||
line.starts_with('Uid:')
|
line.starts_with('Uid:')
|
||||||
);
|
);
|
||||||
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
|
const [_, ruid, euid, suid, fuid] =
|
||||||
|
uid_line.split(/\s+/);
|
||||||
const [_1, state, user_friendly] = state_line.split(/\s+/);
|
|
||||||
|
|
||||||
if (state == 'Z')
|
|
||||||
// Zombie process, just needs to be waited
|
|
||||||
return -1;
|
|
||||||
// We should kill in all other state (Sleep, Stopped & Running)
|
|
||||||
|
|
||||||
if (ruid == this.uid || euid == this.uid)
|
if (ruid == this.uid || euid == this.uid)
|
||||||
return parse_int(proc_id);
|
return parse_int(proc_id);
|
||||||
|
@ -349,23 +334,22 @@ class Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
});
|
})
|
||||||
|
);
|
||||||
|
|
||||||
processes = processes.filter(p => p > 0);
|
processes = processes.filter(p => p > 0);
|
||||||
|
|
||||||
if (processes.length > 0)
|
if (processes.length > 0)
|
||||||
this.logger.debug(`Got processes to kill: ${processes}`);
|
logger.debug(
|
||||||
|
`Got processes to kill: ${processes} uuid=${this.uuid}`
|
||||||
|
);
|
||||||
|
|
||||||
for (const proc of processes) {
|
for (const proc of processes) {
|
||||||
// First stop the processes, but keep their resources allocated so they cant re-fork
|
// First stop the processes, but keep their resources allocated so they cant re-fork
|
||||||
try {
|
try {
|
||||||
process.kill(proc, 'SIGSTOP');
|
process.kill(proc, 'SIGSTOP');
|
||||||
} catch (e) {
|
} catch {
|
||||||
// Could already be dead
|
// Could already be dead
|
||||||
this.logger.debug(
|
|
||||||
`Got error while SIGSTOPping process ${proc}:`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,27 +359,13 @@ class Job {
|
||||||
process.kill(proc, 'SIGKILL');
|
process.kill(proc, 'SIGKILL');
|
||||||
} catch {
|
} catch {
|
||||||
// Could already be dead and just needs to be waited on
|
// Could already be dead and just needs to be waited on
|
||||||
this.logger.debug(
|
|
||||||
`Got error while SIGKILLing process ${proc}:`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
to_wait.push(proc);
|
if (!dont_wait.includes(proc)) wait_pid(proc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug(
|
logger.debug(`Cleaned up processes uuid=${this.uuid}`);
|
||||||
`Finished kill-loop, calling wait_pid to end any zombie processes`
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const proc of to_wait) {
|
|
||||||
if (dont_wait.includes(proc)) continue;
|
|
||||||
|
|
||||||
wait_pid(proc);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug(`Cleaned up processes`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async cleanup_filesystem() {
|
async cleanup_filesystem() {
|
||||||
|
@ -416,7 +386,7 @@ class Job {
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// File was somehow deleted in the time that we read the dir to when we checked the file
|
// File was somehow deleted in the time that we read the dir to when we checked the file
|
||||||
this.logger.warn(`Error removing file ${file_path}: ${e}`);
|
logger.warn(`Error removing file ${file_path}: ${e}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -425,9 +395,8 @@ class Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
async cleanup() {
|
async cleanup() {
|
||||||
this.logger.info(`Cleaning up job`);
|
logger.info(`Cleaning up job uuid=${this.uuid}`);
|
||||||
|
|
||||||
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
|
|
||||||
await this.cleanup_filesystem();
|
await this.cleanup_filesystem();
|
||||||
|
|
||||||
remaining_job_spaces++;
|
remaining_job_spaces++;
|
||||||
|
|
Loading…
Reference in New Issue