This commit is contained in:
khalil chermiti 2024-06-25 17:17:32 +08:00 committed by GitHub
commit 61d19305ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 84 additions and 29 deletions

View File

@ -50,6 +50,12 @@ const SIGNALS = [
]; ];
// ref: https://man7.org/linux/man-pages/man7/signal.7.html // ref: https://man7.org/linux/man-pages/man7/signal.7.html
/**Job Fuctory Function
* this function is used to create a job object from the request body
* validates the request body and returns a promise that resolves to a job object
* @param {Object} body - the request body
* @returns {Promise<Job>} - a promise that resolves to a job object
*/
function get_job(body) { function get_job(body) {
let { let {
language, language,
@ -106,6 +112,7 @@ function get_job(body) {
}); });
} }
// check if the constraints are within the configured limits
for (const constraint of ['memory_limit', 'timeout']) { for (const constraint of ['memory_limit', 'timeout']) {
for (const type of ['compile', 'run']) { for (const type of ['compile', 'run']) {
const constraint_name = `${type}_${constraint}`; const constraint_name = `${type}_${constraint}`;
@ -122,6 +129,7 @@ function get_job(body) {
if (configured_limit <= 0) { if (configured_limit <= 0) {
continue; continue;
} }
if (constraint_value > configured_limit) { if (constraint_value > configured_limit) {
return reject({ return reject({
message: `${constraint_name} cannot exceed the configured limit of ${configured_limit}`, message: `${constraint_name} cannot exceed the configured limit of ${configured_limit}`,
@ -172,6 +180,10 @@ router.use((req, res, next) => {
next(); next();
}); });
/** Websocket route
* used to create a websocket connection to the server to run code
* in a more interactive way by writing to stdin and reading from stdout and stderr
*/
router.ws('/connect', async (ws, req) => { router.ws('/connect', async (ws, req) => {
let job = null; let job = null;
let event_bus = new events.EventEmitter(); let event_bus = new events.EventEmitter();

View File

@ -21,6 +21,11 @@ let gid = 0;
let remaining_job_spaces = config.max_concurrent_jobs; let remaining_job_spaces = config.max_concurrent_jobs;
let job_queue = []; let job_queue = [];
/** Every code execution is a job. This class is used to manage the job and its resources.
* @method prime Used to write the files to the job cache and transfer ownership of the files to the runner.
* @method safe_call Used to call the child process and limit its resources also used to compile and run the code.
* @method execute Used to execute the job runtime and return the result.
*/
class Job { class Job {
#active_timeouts; #active_timeouts;
#active_parent_processes; #active_parent_processes;
@ -58,6 +63,7 @@ class Job {
uid++; uid++;
gid++; gid++;
// generate a new uid and gid within the range of the config values (1001 , 1500)
uid %= config.runner_uid_max - config.runner_uid_min + 1; uid %= config.runner_uid_max - config.runner_uid_min + 1;
gid %= config.runner_gid_max - config.runner_gid_min + 1; gid %= config.runner_gid_max - config.runner_gid_min + 1;
@ -71,7 +77,12 @@ class Job {
); );
} }
/** - Used to write the files (containing code to be executed) to the job cache folder
* and transfer ownership of the files to the runner.
*/
async prime() { async prime() {
// wait for a job slot to open up (default concurrent jobs is 64)
// this is to prevent the runner from being overwhelmed with jobs
if (remaining_job_spaces < 1) { if (remaining_job_spaces < 1) {
this.logger.info(`Awaiting job slot`); this.logger.info(`Awaiting job slot`);
await new Promise(resolve => { await new Promise(resolve => {
@ -84,6 +95,7 @@ class Job {
this.logger.debug(`Transfering ownership`); this.logger.debug(`Transfering ownership`);
// create the job cache folder and transfer ownership to the runner
await fs.mkdir(this.dir, { mode: 0o700 }); await fs.mkdir(this.dir, { mode: 0o700 });
await fs.chown(this.dir, this.uid, this.gid); await fs.chown(this.dir, this.uid, this.gid);
@ -112,6 +124,7 @@ class Job {
this.logger.debug('Primed job'); this.logger.debug('Primed job');
} }
/** Used to clear the active timeouts and processes */
exit_cleanup() { exit_cleanup() {
for (const timeout of this.#active_timeouts) { for (const timeout of this.#active_timeouts) {
clear_timeout(timeout); clear_timeout(timeout);
@ -123,6 +136,7 @@ class Job {
this.logger.debug(`Finished exit cleanup`); this.logger.debug(`Finished exit cleanup`);
} }
/** Close the writables ( stdin, stdout, stderr ) of the active parent processes */
close_cleanup() { close_cleanup() {
for (const proc of this.#active_parent_processes) { for (const proc of this.#active_parent_processes) {
proc.stderr.destroy(); proc.stderr.destroy();
@ -136,10 +150,20 @@ class Job {
this.logger.debug('Destroyed processes writables'); this.logger.debug('Destroyed processes writables');
} }
/** This function is used to call the child process and limit its resources
* - used to compile and run the code.
* @param {string} file - The file to be executed
* @param {string[]} args - The arguments to be passed to the file
* @param {number} timeout - The time limit for the process
* @param {number} memory_limit - The memory limit for the process
* @param {EventEmitter} event_bus - The event bus to be used for communication
*/
async safe_call(file, args, timeout, memory_limit, event_bus = null) { async safe_call(file, args, timeout, memory_limit, event_bus = null) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const nonetwork = config.disable_networking ? ['nosocket'] : []; const nonetwork = config.disable_networking ? ['nosocket'] : [];
// prlimit is a linux specific command
// It is used to limit the resources of the child process
const prlimit = [ const prlimit = [
'prlimit', 'prlimit',
'--nproc=' + this.runtime.max_process_count, '--nproc=' + this.runtime.max_process_count,
@ -147,10 +171,12 @@ class Job {
'--fsize=' + this.runtime.max_file_size, '--fsize=' + this.runtime.max_file_size,
]; ];
// timeout is a linux specific command
// It is used to limit the time of the child process
const timeout_call = [ const timeout_call = [
'timeout', 'timeout',
'-s', '-s',
'9', '9', // SIGKILL
Math.ceil(timeout / 1000), Math.ceil(timeout / 1000),
]; ];
@ -159,10 +185,10 @@ class Job {
} }
const proc_call = [ const proc_call = [
'nice', 'nice', // lower the priority of the process
...timeout_call, ...timeout_call, // kill the process if it exceeds the time limit
...prlimit, ...prlimit, // limit the resources of the process
...nonetwork, ...nonetwork, // disable networking
'bash', 'bash',
file, file,
...args, ...args,
@ -172,6 +198,7 @@ class Job {
var stderr = ''; var stderr = '';
var output = ''; var output = '';
// spawn the child process to execute the file with the given arguments
const proc = cp.spawn(proc_call[0], proc_call.splice(1), { const proc = cp.spawn(proc_call[0], proc_call.splice(1), {
env: { env: {
...this.runtime.env_vars, ...this.runtime.env_vars,
@ -191,6 +218,8 @@ class Job {
proc.stdin.end(); proc.stdin.end();
proc.stdin.destroy(); proc.stdin.destroy();
} else { } else {
// when the event_bus receives a 'stdin' event (over websocket), write the data to the process's stdin
// used to handle interactive programs (like those that require input)
event_bus.on('stdin', data => { event_bus.on('stdin', data => {
proc.stdin.write(data); proc.stdin.write(data);
}); });
@ -200,14 +229,14 @@ class Job {
}); });
} }
// set a timeout to kill the process if it exceeds the time limit
const kill_timeout = const kill_timeout =
(timeout >= 0 && (timeout >= 0 &&
set_timeout(async _ => { set_timeout(async _ => {
this.logger.info(`Timeout exceeded timeout=${timeout}`); this.logger.info(`Timeout exceeded timeout=${timeout}`);
try { try {
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
} } catch (e) {
catch (e) {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
this.logger.debug( this.logger.debug(
`Got error while SIGKILLing process ${proc}:`, `Got error while SIGKILLing process ${proc}:`,
@ -218,15 +247,18 @@ class Job {
null; null;
this.#active_timeouts.push(kill_timeout); this.#active_timeouts.push(kill_timeout);
// when the process writes to stderr, send the data to the event_bus (over websocket later)
proc.stderr.on('data', async data => { proc.stderr.on('data', async data => {
if (event_bus !== null) { if (event_bus !== null) {
event_bus.emit('stderr', data); event_bus.emit('stderr', data);
} else if ((stderr.length + data.length) > this.runtime.output_max_size) { } else if (
stderr.length + data.length >
this.runtime.output_max_size
) {
this.logger.info(`stderr length exceeded`); this.logger.info(`stderr length exceeded`);
try { try {
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
} } catch (e) {
catch (e) {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
this.logger.debug( this.logger.debug(
`Got error while SIGKILLing process ${proc}:`, `Got error while SIGKILLing process ${proc}:`,
@ -239,15 +271,18 @@ class Job {
} }
}); });
// when the process writes to stdout, send the data to the event_bus (over websocket later)
proc.stdout.on('data', async data => { proc.stdout.on('data', async data => {
if (event_bus !== null) { if (event_bus !== null) {
event_bus.emit('stdout', data); event_bus.emit('stdout', data);
} else if ((stdout.length + data.length) > this.runtime.output_max_size) { } else if (
stdout.length + data.length >
this.runtime.output_max_size
) {
this.logger.info(`stdout length exceeded`); this.logger.info(`stdout length exceeded`);
try { try {
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
} } catch (e) {
catch (e) {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
this.logger.debug( this.logger.debug(
`Got error while SIGKILLing process ${proc}:`, `Got error while SIGKILLing process ${proc}:`,
@ -277,11 +312,15 @@ class Job {
}); });
} }
/** Used to execute the job and return the result.
* @param {EventEmitter} event_bus - The event bus to be used for communication
* @returns {Promise} - The result of the execution
*/
async execute(event_bus = null) { async execute(event_bus = null) {
if (this.state !== job_states.PRIMED) { if (this.state !== job_states.PRIMED) {
throw new Error( throw new Error(
'Job must be in primed state, current state: ' + 'Job must be in primed state, current state: ' +
this.state.toString() this.state.toString()
); );
} }
@ -298,22 +337,22 @@ class Job {
const { emit_event_bus_result, emit_event_bus_stage } = const { emit_event_bus_result, emit_event_bus_stage } =
event_bus === null event_bus === null
? { ? {
emit_event_bus_result: () => { }, emit_event_bus_result: () => {},
emit_event_bus_stage: () => { }, emit_event_bus_stage: () => {},
} }
: { : {
emit_event_bus_result: (stage, result, event_bus) => { emit_event_bus_result: (stage, result, event_bus) => {
const { error, code, signal } = result; const { error, code, signal } = result;
event_bus.emit('exit', stage, { event_bus.emit('exit', stage, {
error, error,
code, code,
signal, signal,
}); });
}, },
emit_event_bus_stage: (stage, event_bus) => { emit_event_bus_stage: (stage, event_bus) => {
event_bus.emit('stage', stage); event_bus.emit('stage', stage);
}, },
}; };
if (this.runtime.compiled) { if (this.runtime.compiled) {
this.logger.debug('Compiling'); this.logger.debug('Compiling');
@ -353,6 +392,9 @@ class Job {
}; };
} }
/** Used to cleanup the processes and wait for any zombie processes to end
* - scan /proc for any processes that are owned by the runner and kill them
*/
cleanup_processes(dont_wait = []) { cleanup_processes(dont_wait = []) {
let processes = [1]; let processes = [1];
const to_wait = []; const to_wait = [];
@ -449,6 +491,7 @@ class Job {
this.logger.debug(`Cleaned up processes`); this.logger.debug(`Cleaned up processes`);
} }
// used to cleanup the filesystem for any residual files
async cleanup_filesystem() { async cleanup_filesystem() {
for (const clean_path of globals.clean_directories) { for (const clean_path of globals.clean_directories) {
const contents = await fs.readdir(clean_path); const contents = await fs.readdir(clean_path);