From c7efa5372ac7d275a5e140995b4dff510171c14c Mon Sep 17 00:00:00 2001 From: Thomas Hobson Date: Thu, 11 Nov 2021 19:27:54 +1300 Subject: [PATCH 1/2] api(job): Switch process cleanup to sync The system used to use async. This would result in execution is handed off to other processes. In the case of a forkbomb was used, it could circumvent the janitor as it consumed more CPU time which would prevent the process janitor from reading the process information in. --- api/src/job.js | 86 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/api/src/job.js b/api/src/job.js index ca95e6f..e6fafca 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -5,6 +5,7 @@ const path = require('path'); const config = require('./config'); const globals = require('./globals'); const fs = require('fs/promises'); +const fss = require('fs'); const wait_pid = require('waitpid'); const job_states = { @@ -184,24 +185,24 @@ class Job { } }); - const exit_cleanup = async () => { + const exit_cleanup = () => { clear_timeout(kill_timeout); proc.stderr.destroy(); proc.stdout.destroy(); - await this.cleanup_processes(); + this.cleanup_processes(); logger.debug(`Finished exit cleanup uuid=${this.uuid}`); }; - proc.on('exit', async (code, signal) => { - await exit_cleanup(); + proc.on('exit', (code, signal) => { + exit_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); - proc.on('error', async err => { - await exit_cleanup(); + proc.on('error', err => { + exit_cleanup(); reject({ error: err, stdout, stderr, output }); }); @@ -304,38 +305,46 @@ class Job { this.state = job_states.EXECUTED; } - async cleanup_processes(dont_wait = []) { + cleanup_processes(dont_wait = []) { let processes = [1]; + const to_wait = []; logger.debug(`Cleaning up processes uuid=${this.uuid}`); while (processes.length > 0) { processes = []; - const proc_ids = await fs.readdir('/proc'); + const proc_ids = fss.readdir_sync('/proc'); - processes = await Promise.all( - proc_ids.map(async proc_id => { - if (isNaN(proc_id)) return -1; - try { - const proc_status = await fs.read_file( - path.join('/proc', proc_id, 'status') - ); - const proc_lines = proc_status.to_string().split('\n'); - const uid_line = proc_lines.find(line => - line.starts_with('Uid:') - ); - const [_, ruid, euid, suid, fuid] = - uid_line.split(/\s+/); + processes = proc_ids.map(proc_id => { + if (isNaN(proc_id)) return -1; + try { + const proc_status = fss.read_file_sync( + path.join('/proc', proc_id, 'status') + ); + const proc_lines = proc_status.to_string().split('\n'); + const state_line = proc_lines.find(line => + line.starts_with('State:') + ); + const uid_line = proc_lines.find(line => + line.starts_with('Uid:') + ); + const [_, ruid, euid, suid, fuid] = uid_line.split(/\s+/); - if (ruid == this.uid || euid == this.uid) - return parse_int(proc_id); - } catch { + const [_1, state, user_friendly] = state_line.split(/\s+/); + + if (state == 'Z') + // Zombie process, just needs to be waited return -1; - } + // We should kill in all other state (Sleep, Stopped & Running) + if (ruid == this.uid || euid == this.uid) + return parse_int(proc_id); + } catch { return -1; - }) - ); + } + + return -1; + }); processes = processes.filter(p => p > 0); @@ -348,8 +357,12 @@ class Job { // First stop the processes, but keep their resources allocated so they cant re-fork try { process.kill(proc, 'SIGSTOP'); - } catch { + } catch (e) { // Could already be dead + logger.debug( + `Got error while SIGSTOPping process ${proc}:`, + e + ); } } @@ -359,12 +372,26 @@ class Job { process.kill(proc, 'SIGKILL'); } catch { // Could already be dead and just needs to be waited on + logger.debug( + `Got error while SIGKILLing process ${proc}:`, + e + ); } - if (!dont_wait.includes(proc)) wait_pid(proc); + to_wait.push(proc); } } + logger.debug( + `Finished kill-loop, calling wait_pid to end any zombie processes uuid=${this.uuid}` + ); + + for (const proc of to_wait) { + if (dont_wait.includes(proc)) continue; + + wait_pid(proc); + } + logger.debug(`Cleaned up processes uuid=${this.uuid}`); } @@ -397,6 +424,7 @@ class Job { async cleanup() { logger.info(`Cleaning up job uuid=${this.uuid}`); + this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow await this.cleanup_filesystem(); remaining_job_spaces++; From c091c117c7c1a5165c137e3d367adc8eea2662f4 Mon Sep 17 00:00:00 2001 From: Thomas Hobson Date: Thu, 11 Nov 2021 21:34:30 +1300 Subject: [PATCH 2/2] api(job): Decrease safe_call CPU time By increasing the niceness value of child processes, the scheduler gives less CPU Time to them. This allows the node process to get more CPU time, and can work through the event queue faster. --- api/src/job.js | 75 ++++++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/api/src/job.js b/api/src/job.js index e6fafca..eef2ffd 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -1,4 +1,5 @@ -const logger = require('logplease').create('job'); +const logplease = require('logplease'); +const logger = logplease.create('job'); const { v4: uuidv4 } = require('uuid'); const cp = require('child_process'); const path = require('path'); @@ -30,6 +31,9 @@ setInterval(() => { class Job { constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { this.uuid = uuidv4(); + + this.logger = logplease.create(`job/${this.uuid}`); + this.runtime = runtime; this.files = files.map((file, i) => ({ name: file.name || `file${i}.code`, @@ -54,6 +58,8 @@ class Job { uid %= config.runner_uid_max - config.runner_uid_min + 1; gid %= config.runner_gid_max - config.runner_gid_min + 1; + this.logger.debug(`Assigned uid=${this.uid} gid=${this.gid}`); + this.state = job_states.READY; this.dir = path.join( config.data_directory, @@ -64,17 +70,17 @@ class Job { async prime() { if (remaining_job_spaces < 1) { - logger.info(`Awaiting job slot uuid=${this.uuid}`); + this.logger.info(`Awaiting job slot`); await new Promise(resolve => { jobQueue.push(resolve); }); } - logger.info(`Priming job uuid=${this.uuid}`); + this.logger.info(`Priming job`); remaining_job_spaces--; - logger.debug('Writing files to job cache'); + this.logger.debug('Writing files to job cache'); - logger.debug(`Transfering ownership uid=${this.uid} gid=${this.gid}`); + this.logger.debug(`Transfering ownership`); await fs.mkdir(this.dir, { mode: 0o700 }); await fs.chown(this.dir, this.uid, this.gid); @@ -101,7 +107,7 @@ class Job { this.state = job_states.PRIMED; - logger.debug('Primed job'); + this.logger.debug('Primed job'); } async safe_call(file, args, timeout, memory_limit, eventBus = null) { @@ -119,7 +125,14 @@ class Job { prlimit.push('--as=' + memory_limit); } - const proc_call = [...prlimit, ...nonetwork, 'bash', file, ...args]; + const proc_call = [ + 'nice', + ...prlimit, + ...nonetwork, + 'bash', + file, + ...args, + ]; var stdout = ''; var stderr = ''; @@ -154,9 +167,7 @@ class Job { const kill_timeout = (timeout >= 0 && set_timeout(async _ => { - logger.info( - `Timeout exceeded timeout=${timeout} uuid=${this.uuid}` - ); + this.logger.info(`Timeout exceeded timeout=${timeout}`); process.kill(proc.pid, 'SIGKILL'); }, timeout)) || null; @@ -165,7 +176,7 @@ class Job { if (eventBus !== null) { eventBus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { - logger.info(`stderr length exceeded uuid=${this.uuid}`); + this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); } else { stderr += data; @@ -177,7 +188,7 @@ class Job { if (eventBus !== null) { eventBus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { - logger.info(`stdout length exceeded uuid=${this.uuid}`); + this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); } else { stdout += data; @@ -192,7 +203,7 @@ class Job { proc.stdout.destroy(); this.cleanup_processes(); - logger.debug(`Finished exit cleanup uuid=${this.uuid}`); + this.logger.debug(`Finished exit cleanup`); }; proc.on('exit', (code, signal) => { @@ -217,17 +228,13 @@ class Job { ); } - logger.info( - `Executing job uuid=${this.uuid} uid=${this.uid} gid=${ - this.gid - } runtime=${this.runtime.toString()}` - ); + this.logger.info(`Executing job runtime=${this.runtime.toString()}`); const code_files = (this.runtime.language === 'file' && this.files) || this.files.filter(file => file.encoding == 'utf8'); - logger.debug('Compiling'); + this.logger.debug('Compiling'); let compile; @@ -240,7 +247,7 @@ class Job { ); } - logger.debug('Running'); + this.logger.debug('Running'); const run = await this.safe_call( path.join(this.runtime.pkgdir, 'run'), @@ -267,10 +274,8 @@ class Job { ); } - logger.info( - `Interactively executing job uuid=${this.uuid} uid=${ - this.uid - } gid=${this.gid} runtime=${this.runtime.toString()}` + this.logger.info( + `Interactively executing job runtime=${this.runtime.toString()}` ); const code_files = @@ -290,7 +295,7 @@ class Job { eventBus.emit('exit', 'compile', { error, code, signal }); } - logger.debug('Running'); + this.logger.debug('Running'); eventBus.emit('stage', 'run'); const { error, code, signal } = await this.safe_call( path.join(this.runtime.pkgdir, 'run'), @@ -308,7 +313,7 @@ class Job { cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = []; - logger.debug(`Cleaning up processes uuid=${this.uuid}`); + this.logger.debug(`Cleaning up processes`); while (processes.length > 0) { processes = []; @@ -349,9 +354,7 @@ class Job { processes = processes.filter(p => p > 0); if (processes.length > 0) - logger.debug( - `Got processes to kill: ${processes} uuid=${this.uuid}` - ); + this.logger.debug(`Got processes to kill: ${processes}`); for (const proc of processes) { // First stop the processes, but keep their resources allocated so they cant re-fork @@ -359,7 +362,7 @@ class Job { process.kill(proc, 'SIGSTOP'); } catch (e) { // Could already be dead - logger.debug( + this.logger.debug( `Got error while SIGSTOPping process ${proc}:`, e ); @@ -372,7 +375,7 @@ class Job { process.kill(proc, 'SIGKILL'); } catch { // Could already be dead and just needs to be waited on - logger.debug( + this.logger.debug( `Got error while SIGKILLing process ${proc}:`, e ); @@ -382,8 +385,8 @@ class Job { } } - logger.debug( - `Finished kill-loop, calling wait_pid to end any zombie processes uuid=${this.uuid}` + this.logger.debug( + `Finished kill-loop, calling wait_pid to end any zombie processes` ); for (const proc of to_wait) { @@ -392,7 +395,7 @@ class Job { wait_pid(proc); } - logger.debug(`Cleaned up processes uuid=${this.uuid}`); + this.logger.debug(`Cleaned up processes`); } async cleanup_filesystem() { @@ -413,7 +416,7 @@ class Job { } } catch (e) { // File was somehow deleted in the time that we read the dir to when we checked the file - logger.warn(`Error removing file ${file_path}: ${e}`); + this.logger.warn(`Error removing file ${file_path}: ${e}`); } } } @@ -422,7 +425,7 @@ class Job { } async cleanup() { - logger.info(`Cleaning up job uuid=${this.uuid}`); + this.logger.info(`Cleaning up job`); this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow await this.cleanup_filesystem();