piston/api/src/job.js

454 lines
14 KiB
JavaScript
Raw Normal View History

import { create } from 'logplease';
// @ts-ignore
const logger = create('job');
import { v4 as uuidv4 } from 'uuid';
import { spawn } from 'child_process';
import { join, relative, dirname } from 'path';
2023-03-07 00:02:16 +01:00
import config from './config.js';
import * as globals from './globals.js';
import { mkdir, chown, writeFile, readdir, stat as _stat, rm } from 'fs/promises';
import { readdirSync, readFileSync } from 'fs';
import wait_pid from 'waitpid';
2021-02-20 23:39:03 +01:00
2021-02-20 15:13:56 +01:00
const job_states = {
2021-05-08 02:30:40 +02:00
READY: Symbol('Ready to be primed'),
PRIMED: Symbol('Primed and ready for execution'),
EXECUTED: Symbol('Executed and ready for cleanup'),
2021-02-20 23:39:03 +01:00
};
2021-02-20 15:13:56 +01:00
2021-03-13 06:01:04 +01:00
let uid = 0;
let gid = 0;
2021-02-20 15:13:56 +01:00
let remaining_job_spaces = config.max_concurrent_jobs;
let jobQueue = [];
2021-10-08 15:16:57 +02:00
setInterval(() => {
// Every 10ms try resolve a new job, if there is an available slot
if (jobQueue.length > 0 && remaining_job_spaces > 0) {
2021-10-08 15:16:57 +02:00
jobQueue.shift()();
}
2021-10-08 15:16:57 +02:00
}, 10);
export default class Job {
2021-10-10 19:11:16 +02:00
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
2021-05-08 02:30:40 +02:00
this.uuid = uuidv4();
this.logger = create(`job/${this.uuid}`, {});
2021-05-08 02:30:40 +02:00
this.runtime = runtime;
this.files = files.map((file, i) => ({
name: file.name || `file${i}.code`,
content: file.content,
encoding: ['base64', 'hex', 'utf8'].includes(file.encoding)
? file.encoding
: 'utf8',
2021-05-08 02:30:40 +02:00
}));
this.args = args;
this.stdin = stdin;
2021-10-10 19:11:16 +02:00
this.timeouts = timeouts;
this.memory_limits = memory_limits;
2021-05-08 02:30:40 +02:00
this.uid = config.runner_uid_min + uid;
this.gid = config.runner_gid_min + gid;
uid++;
gid++;
uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1;
this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`);
2021-05-08 02:30:40 +02:00
this.state = job_states.READY;
this.dir = join(
2021-05-08 02:30:40 +02:00
config.data_directory,
globals.data_directories.jobs,
this.uuid
);
}
2021-02-20 15:13:56 +01:00
2021-05-08 02:30:40 +02:00
async prime() {
if (remaining_job_spaces < 1) {
this.logger.info(`Awaiting job slot`);
2021-10-08 15:16:57 +02:00
await new Promise(resolve => {
jobQueue.push(resolve);
});
}
2021-02-20 15:13:56 +01:00
this.logger.info(`Priming job`);
remaining_job_spaces--;
this.logger.debug('Writing files to job cache');
2021-02-20 15:13:56 +01:00
this.logger.debug(`Transfering ownership`);
2021-02-20 15:13:56 +01:00
await mkdir(this.dir, { mode: 0o700 });
await chown(this.dir, this.uid, this.gid);
2021-02-20 15:13:56 +01:00
2021-05-08 02:30:40 +02:00
for (const file of this.files) {
const file_path = join(this.dir, file.name);
const rel = relative(this.dir, file_path);
const file_content = Buffer.from(file.content, file.encoding);
2021-10-08 15:16:57 +02:00
if (rel.startsWith('..'))
throw Error(
`File path "${file.name}" tries to escape parent directory: ${rel}`
);
await mkdir(dirname(file_path), {
2021-10-08 15:16:57 +02:00
recursive: true,
mode: 0o700,
});
await chown(dirname(file_path), this.uid, this.gid);
2021-02-20 15:13:56 +01:00
await writeFile(file_path, file_content);
await chown(file_path, this.uid, this.gid);
2021-05-08 02:30:40 +02:00
}
2021-03-13 06:01:04 +01:00
2021-05-08 02:30:40 +02:00
this.state = job_states.PRIMED;
2021-02-20 15:13:56 +01:00
this.logger.debug('Primed job');
2021-05-08 02:20:21 +02:00
}
2021-03-13 06:01:04 +01:00
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
2021-05-08 02:30:40 +02:00
return new Promise((resolve, reject) => {
const nonetwork = config.disable_networking ? ['nosocket'] : [];
const prlimit = [
'prlimit',
'--nproc=' + this.runtime.max_process_count,
'--nofile=' + this.runtime.max_open_files,
'--fsize=' + this.runtime.max_file_size,
2021-05-08 02:30:40 +02:00
];
const timeout_call = [
'timeout',
'-s',
'9',
Math.ceil(timeout / 1000),
];
2021-05-08 02:30:40 +02:00
if (memory_limit >= 0) {
prlimit.push('--as=' + memory_limit);
}
const proc_call = [
'nice',
...timeout_call,
...prlimit,
...nonetwork,
'bash',
file,
...args,
];
2021-05-08 02:30:40 +02:00
var stdout = '';
var stderr = '';
var output = '';
const proc = spawn(proc_call[0], proc_call.splice(1), {
2021-05-08 02:30:40 +02:00
env: {
...this.runtime.env_vars,
PISTON_LANGUAGE: this.runtime.language,
},
stdio: 'pipe',
cwd: this.dir,
uid: this.uid,
gid: this.gid,
detached: true, //give this process its own process group
});
2021-02-20 15:13:56 +01:00
2021-10-08 15:16:57 +02:00
if (eventBus === null) {
proc.stdin.write(this.stdin);
proc.stdin.end();
proc.stdin.destroy();
2021-10-08 15:16:57 +02:00
} else {
eventBus.on('stdin', data => {
proc.stdin.write(data);
2021-10-08 15:16:57 +02:00
});
2021-07-16 14:22:55 +02:00
2021-10-08 15:16:57 +02:00
eventBus.on('kill', signal => {
proc.kill(signal);
});
}
2021-05-08 02:30:40 +02:00
2021-10-10 17:18:31 +02:00
const kill_timeout =
(timeout >= 0 &&
setTimeout(async _ => {
this.logger.info(`Timeout exceeded timeout=${timeout}`);
2021-10-10 17:18:31 +02:00
process.kill(proc.pid, 'SIGKILL');
}, timeout)) ||
null;
2021-05-08 02:30:40 +02:00
proc.stderr.on('data', async data => {
2021-10-08 15:16:57 +02:00
if (eventBus !== null) {
eventBus.emit('stderr', data);
2021-10-02 14:08:36 +02:00
} else if (stderr.length > this.runtime.output_max_size) {
this.logger.info(`stderr length exceeded`);
2021-10-08 15:16:57 +02:00
process.kill(proc.pid, 'SIGKILL');
2021-05-08 02:30:40 +02:00
} else {
stderr += data;
output += data;
}
});
2021-02-20 15:13:56 +01:00
proc.stdout.on('data', async data => {
2021-10-08 15:16:57 +02:00
if (eventBus !== null) {
eventBus.emit('stdout', data);
2021-10-02 14:08:36 +02:00
} else if (stdout.length > this.runtime.output_max_size) {
this.logger.info(`stdout length exceeded`);
2021-10-08 15:16:57 +02:00
process.kill(proc.pid, 'SIGKILL');
2021-05-08 02:30:40 +02:00
} else {
stdout += data;
output += data;
}
});
2021-03-13 06:01:04 +01:00
const exit_cleanup = () => {
clearTimeout(kill_timeout);
2021-02-22 09:55:51 +01:00
2021-05-08 02:30:40 +02:00
proc.stderr.destroy();
proc.stdout.destroy();
this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`);
2021-05-08 02:30:40 +02:00
};
2021-02-20 15:13:56 +01:00
proc.on('exit', (code, signal) => {
exit_cleanup();
2021-03-13 06:01:04 +01:00
2021-10-08 15:16:57 +02:00
resolve({ stdout, stderr, code, signal, output });
2021-05-08 02:30:40 +02:00
});
2021-03-13 06:01:04 +01:00
proc.on('error', err => {
exit_cleanup();
2021-02-23 07:52:49 +01:00
2021-05-08 02:30:40 +02:00
reject({ error: err, stdout, stderr, output });
});
});
}
2021-02-22 09:55:51 +01:00
2021-05-08 02:30:40 +02:00
async execute() {
if (this.state !== job_states.PRIMED) {
throw new Error(
'Job must be in primed state, current state: ' +
this.state.toString()
);
}
2021-03-13 06:01:04 +01:00
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
2021-02-20 15:13:56 +01:00
2021-10-15 14:25:33 +02:00
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
this.logger.debug('Compiling');
2021-02-21 09:36:49 +01:00
2021-05-08 02:30:40 +02:00
let compile;
2021-02-20 15:13:56 +01:00
2021-05-08 02:30:40 +02:00
if (this.runtime.compiled) {
compile = await this.safe_call(
join(this.runtime.pkgdir, 'compile'),
code_files.map(x => x.name),
2021-10-10 19:11:16 +02:00
this.timeouts.compile,
this.memory_limits.compile
2021-05-08 02:30:40 +02:00
);
2021-03-13 06:01:04 +01:00
}
2021-02-20 15:13:56 +01:00
this.logger.debug('Running');
2021-04-26 00:35:34 +02:00
2021-05-08 02:30:40 +02:00
const run = await this.safe_call(
join(this.runtime.pkgdir, 'run'),
[code_files[0].name, ...this.args],
2021-10-10 19:11:16 +02:00
this.timeouts.run,
this.memory_limits.run
2021-05-08 02:30:40 +02:00
);
2021-04-28 06:03:35 +02:00
2021-05-08 02:30:40 +02:00
this.state = job_states.EXECUTED;
2021-05-08 02:30:40 +02:00
return {
compile,
run,
language: this.runtime.language,
version: this.runtime.version.raw,
};
}
2021-10-08 15:16:57 +02:00
async execute_interactive(eventBus) {
if (this.state !== job_states.PRIMED) {
throw new Error(
'Job must be in primed state, current state: ' +
this.state.toString()
);
}
this.logger.info(
`Interactively executing job runtime=${this.runtime.toString()}`
);
2021-10-15 14:25:33 +02:00
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
2021-10-08 15:16:57 +02:00
if (this.runtime.compiled) {
eventBus.emit('stage', 'compile');
const { error, code, signal } = await this.safe_call(
join(this.runtime.pkgdir, 'compile'),
code_files.map(x => x.name),
2021-10-10 19:11:16 +02:00
this.timeouts.compile,
this.memory_limits.compile,
eventBus
2021-10-08 15:16:57 +02:00
);
2021-10-08 15:16:57 +02:00
eventBus.emit('exit', 'compile', { error, code, signal });
}
this.logger.debug('Running');
2021-10-08 15:16:57 +02:00
eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call(
join(this.runtime.pkgdir, 'run'),
[code_files[0].name, ...this.args],
2021-10-10 19:11:16 +02:00
this.timeouts.run,
this.memory_limits.run,
eventBus
);
2021-10-08 15:16:57 +02:00
eventBus.emit('exit', 'run', { error, code, signal });
2021-10-02 14:08:36 +02:00
this.state = job_states.EXECUTED;
}
cleanup_processes(dont_wait = []) {
2021-05-08 02:30:40 +02:00
let processes = [1];
const to_wait = [];
this.logger.debug(`Cleaning up processes`);
2021-05-08 02:30:40 +02:00
while (processes.length > 0) {
2021-10-08 15:16:57 +02:00
processes = [];
const proc_ids = readdirSync('/proc');
2021-10-08 15:16:57 +02:00
processes = proc_ids.map(proc_id => {
if (isNaN(+proc_id)) return -1;
try {
const proc_status = readFileSync(
join('/proc', proc_id, 'status')
);
const proc_lines = proc_status.toString().split('\n');
const state_line = proc_lines.find(line =>
line.startsWith('State:')
);
const uid_line = proc_lines.find(line =>
line.startsWith('Uid:')
);
const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/);
const [_1, state, user_friendly] = state_line.split(/\s+/);
const proc_id_int = parseInt(proc_id);
2022-07-01 15:46:44 +02:00
// Skip over any processes that aren't ours.
// @ts-ignore: dont want to risk fixing this
if (ruid != this.uid && euid != this.uid) return -1;
if (state == 'Z') {
// Zombie process, just needs to be waited, regardless of the user id
if (!to_wait.includes(proc_id_int))
to_wait.push(proc_id_int);
2021-10-08 15:16:57 +02:00
return -1;
}
// We should kill in all other state (Sleep, Stopped & Running)
2022-07-01 15:46:44 +02:00
return proc_id_int;
} catch {
2021-10-08 15:16:57 +02:00
return -1;
}
return -1;
});
2021-10-08 15:16:57 +02:00
processes = processes.filter(p => p > 0);
2021-05-08 02:30:40 +02:00
2021-10-08 15:16:57 +02:00
if (processes.length > 0)
this.logger.debug(`Got processes to kill: ${processes}`);
2021-05-08 02:30:40 +02:00
for (const proc of processes) {
// First stop the processes, but keep their resources allocated so they cant re-fork
try {
process.kill(proc, 'SIGSTOP');
} catch (e) {
2021-05-08 02:30:40 +02:00
// Could already be dead
this.logger.debug(
`Got error while SIGSTOPping process ${proc}:`,
e
);
2021-05-08 02:30:40 +02:00
}
}
for (const proc of processes) {
// Then clear them out of the process tree
try {
process.kill(proc, 'SIGKILL');
} catch (e) {
2021-05-08 02:30:40 +02:00
// Could already be dead and just needs to be waited on
this.logger.debug(
`Got error while SIGKILLing process ${proc}:`,
e
);
2021-05-08 02:30:40 +02:00
}
to_wait.push(proc);
2021-05-08 02:30:40 +02:00
}
2021-04-28 06:11:49 +02:00
}
this.logger.debug(
`Finished kill-loop, calling wait_pid to end any zombie processes`
);
for (const proc of to_wait) {
if (dont_wait.includes(proc)) continue;
wait_pid(proc);
}
this.logger.debug(`Cleaned up processes`);
2021-04-28 06:03:35 +02:00
}
2021-05-08 02:30:40 +02:00
async cleanup_filesystem() {
for (const clean_path of globals.clean_directories) {
const contents = await readdir(clean_path);
2021-05-08 02:30:40 +02:00
for (const file of contents) {
const file_path = join(clean_path, file);
2021-05-08 02:30:40 +02:00
try {
const stat = await _stat(file_path);
2021-05-08 02:30:40 +02:00
if (stat.uid === this.uid) {
await rm(file_path, {
2021-05-08 02:30:40 +02:00
recursive: true,
force: true,
});
}
} catch (e) {
// File was somehow deleted in the time that we read the dir to when we checked the file
this.logger.warn(`Error removing file ${file_path}: ${e}`);
2021-05-08 02:30:40 +02:00
}
}
}
await rm(this.dir, { recursive: true, force: true });
2021-05-08 02:30:40 +02:00
}
2021-05-08 02:30:40 +02:00
async cleanup() {
this.logger.info(`Cleaning up job`);
2021-03-13 06:01:04 +01:00
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
await this.cleanup_filesystem();
remaining_job_spaces++;
2021-05-08 02:30:40 +02:00
}
2021-02-20 15:13:56 +01:00
}