From 993d5c6fb0a1e4d6a138e7ce2e25cb35dd7981df Mon Sep 17 00:00:00 2001 From: Omar Brikaa Date: Fri, 30 Sep 2022 14:22:29 +0200 Subject: [PATCH 1/5] Only resolve and destroy streams on job close --- api/src/job.js | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/api/src/job.js b/api/src/job.js index d7e36b3..86c4090 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -126,6 +126,11 @@ class Job { this.#active_timeouts = []; this.logger.debug('Cleared the active timeouts'); + this.cleanup_processes(); + this.logger.debug(`Finished exit cleanup`); + } + + close_cleanup() { for (const proc of this.#active_parent_processes) { proc.stderr.destroy(); if (!proc.stdin.destroyed) { @@ -135,10 +140,7 @@ class Job { proc.stdout.destroy(); } this.#active_parent_processes = []; - this.logger.debug('Destroyed parent processes writables'); - - this.cleanup_processes(); - this.logger.debug(`Finished exit cleanup`); + this.logger.debug('Destroyed processes writables'); } async safe_call(file, args, timeout, memory_limit, eventBus = null) { @@ -153,7 +155,10 @@ class Job { ]; const timeout_call = [ - 'timeout', '-s', '9', Math.ceil(timeout / 1000), + 'timeout', + '-s', + '9', + Math.ceil(timeout / 1000), ]; if (memory_limit >= 0) { @@ -231,14 +236,17 @@ class Job { } }); - proc.on('exit', (code, signal) => { - this.exit_cleanup(); + proc.on('exit', () => this.exit_cleanup()); + + proc.on('close', (code, signal) => { + this.close_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); proc.on('error', err => { this.exit_cleanup(); + this.close_cleanup(); reject({ error: err, stdout, stderr, output }); }); @@ -370,11 +378,11 @@ class Job { const proc_id_int = parse_int(proc_id); // Skip over any processes that aren't ours. - if(ruid != this.uid && euid != this.uid) return -1; + if (ruid != this.uid && euid != this.uid) return -1; - if (state == 'Z'){ + if (state == 'Z') { // Zombie process, just needs to be waited, regardless of the user id - if(!to_wait.includes(proc_id_int)) + if (!to_wait.includes(proc_id_int)) to_wait.push(proc_id_int); return -1; @@ -466,6 +474,7 @@ class Job { this.logger.info(`Cleaning up job`); this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow + this.close_cleanup(); await this.cleanup_filesystem(); remaining_job_spaces++; From 91789b9e32c3c02a0f454dbe287d4be40f7f572f Mon Sep 17 00:00:00 2001 From: Omar Brikaa Date: Fri, 30 Sep 2022 14:27:34 +0200 Subject: [PATCH 2/5] Don't let an external websocket close cause a cleanup --- api/src/api/v2.js | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 9cb5c36..6ac1b94 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -219,6 +219,7 @@ router.ws('/connect', async (ws, req) => { ); await job.execute_interactive(eventBus); + await job.cleanup(); ws.close(4999, 'Job Completed'); } else { @@ -255,12 +256,6 @@ router.ws('/connect', async (ws, req) => { } }); - ws.on('close', async () => { - if (job !== null) { - await job.cleanup(); - } - }); - setTimeout(() => { //Terminate the socket after 1 second, if not initialized. if (job === null) ws.close(4001, 'Initialization Timeout'); From 684bfd661004ca93be77818e8e55698617b2e61e Mon Sep 17 00:00:00 2001 From: Omar Brikaa Date: Fri, 30 Sep 2022 17:06:26 +0200 Subject: [PATCH 3/5] Dequeue new job after clean up instead of polling --- api/src/job.js | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/api/src/job.js b/api/src/job.js index 86c4090..2244fa0 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -20,13 +20,6 @@ let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; let jobQueue = []; -setInterval(() => { - // Every 10ms try resolve a new job, if there is an available slot - if (jobQueue.length > 0 && remaining_job_spaces > 0) { - jobQueue.shift()(); - } -}, 10); - class Job { #active_timeouts; #active_parent_processes; @@ -84,7 +77,6 @@ class Job { jobQueue.push(resolve); }); } - this.logger.info(`Priming job`); remaining_job_spaces--; this.logger.debug('Writing files to job cache'); @@ -478,6 +470,9 @@ class Job { await this.cleanup_filesystem(); remaining_job_spaces++; + if (jobQueue.length > 0) { + jobQueue.shift()(); + } } } From f40f7702debfa9c7ffb4d8a872ce6b8d8674c648 Mon Sep 17 00:00:00 2001 From: Omar Brikaa Date: Fri, 30 Sep 2022 21:37:19 +0200 Subject: [PATCH 4/5] camelCase -> snake_case --- api/src/api/v2.js | 16 ++++++++-------- api/src/job.js | 38 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 6ac1b94..1f87b78 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -172,9 +172,9 @@ router.use((req, res, next) => { router.ws('/connect', async (ws, req) => { let job = null; - let eventBus = new events.EventEmitter(); + let event_bus = new events.EventEmitter(); - eventBus.on('stdout', data => + event_bus.on('stdout', data => ws.send( JSON.stringify({ type: 'data', @@ -183,7 +183,7 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stderr', data => + event_bus.on('stderr', data => ws.send( JSON.stringify({ type: 'data', @@ -192,10 +192,10 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stage', stage => + event_bus.on('stage', stage => ws.send(JSON.stringify({ type: 'stage', stage })) ); - eventBus.on('exit', (stage, status) => + event_bus.on('exit', (stage, status) => ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ); @@ -218,7 +218,7 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute_interactive(eventBus); + await job.execute_interactive(event_bus); await job.cleanup(); ws.close(4999, 'Job Completed'); @@ -229,7 +229,7 @@ router.ws('/connect', async (ws, req) => { case 'data': if (job !== null) { if (msg.stream === 'stdin') { - eventBus.emit('stdin', msg.data); + event_bus.emit('stdin', msg.data); } else { ws.close(4004, 'Can only write to stdin'); } @@ -240,7 +240,7 @@ router.ws('/connect', async (ws, req) => { case 'signal': if (job !== null) { if (SIGNALS.includes(msg.signal)) { - eventBus.emit('signal', msg.signal); + event_bus.emit('signal', msg.signal); } else { ws.close(4005, 'Invalid signal'); } diff --git a/api/src/job.js b/api/src/job.js index 2244fa0..550e6fd 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -18,7 +18,7 @@ let uid = 0; let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; -let jobQueue = []; +let job_queue = []; class Job { #active_timeouts; @@ -74,7 +74,7 @@ class Job { if (remaining_job_spaces < 1) { this.logger.info(`Awaiting job slot`); await new Promise(resolve => { - jobQueue.push(resolve); + job_queue.push(resolve); }); } this.logger.info(`Priming job`); @@ -135,7 +135,7 @@ class Job { this.logger.debug('Destroyed processes writables'); } - async safe_call(file, args, timeout, memory_limit, eventBus = null) { + async safe_call(file, args, timeout, memory_limit, event_bus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -181,16 +181,16 @@ class Job { this.#active_parent_processes.push(proc); - if (eventBus === null) { + if (event_bus === null) { proc.stdin.write(this.stdin); proc.stdin.end(); proc.stdin.destroy(); } else { - eventBus.on('stdin', data => { + event_bus.on('stdin', data => { proc.stdin.write(data); }); - eventBus.on('kill', signal => { + event_bus.on('kill', signal => { proc.kill(signal); }); } @@ -205,8 +205,8 @@ class Job { this.#active_timeouts.push(kill_timeout); proc.stderr.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stderr', data); + if (event_bus !== null) { + event_bus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -217,8 +217,8 @@ class Job { }); proc.stdout.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stdout', data); + if (event_bus !== null) { + event_bus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -294,7 +294,7 @@ class Job { }; } - async execute_interactive(eventBus) { + async execute_interactive(event_bus) { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -310,31 +310,31 @@ class Job { let compile_errored = false; if (this.runtime.compiled) { - eventBus.emit('stage', 'compile'); + event_bus.emit('stage', 'compile'); const { error, code, signal } = await this.safe_call( this.runtime.compile, code_files.map(x => x.name), this.timeouts.compile, this.memory_limits.compile, - eventBus + event_bus ); - eventBus.emit('exit', 'compile', { error, code, signal }); + event_bus.emit('exit', 'compile', { error, code, signal }); compile_errored = code !== 0; } if (!compile_errored) { this.logger.debug('Running'); - eventBus.emit('stage', 'run'); + event_bus.emit('stage', 'run'); const { error, code, signal } = await this.safe_call( this.runtime.run, [code_files[0].name, ...this.args], this.timeouts.run, this.memory_limits.run, - eventBus + event_bus ); - eventBus.emit('exit', 'run', { error, code, signal }); + event_bus.emit('exit', 'run', { error, code, signal }); } this.state = job_states.EXECUTED; @@ -470,8 +470,8 @@ class Job { await this.cleanup_filesystem(); remaining_job_spaces++; - if (jobQueue.length > 0) { - jobQueue.shift()(); + if (job_queue.length > 0) { + job_queue.shift()(); } } } From d4ffccb32b8b70938f6be5eb0b4a150595ab94e2 Mon Sep 17 00:00:00 2001 From: Omar Brikaa Date: Fri, 30 Sep 2022 22:25:30 +0200 Subject: [PATCH 5/5] Combine execute, execute_interactive functions --- api/src/api/v2.js | 2 +- api/src/job.js | 82 +++++++++++++++++------------------------------ 2 files changed, 30 insertions(+), 54 deletions(-) diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 1f87b78..c5ba659 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -218,7 +218,7 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute_interactive(event_bus); + await job.execute(event_bus); await job.cleanup(); ws.close(4999, 'Job Completed'); diff --git a/api/src/job.js b/api/src/job.js index 550e6fd..7f80d6b 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -245,7 +245,7 @@ class Job { }); } - async execute() { + async execute(event_bus = null) { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -256,32 +256,54 @@ class Job { this.logger.info(`Executing job runtime=${this.runtime.toString()}`); const code_files = this.files.filter(file => file.encoding == 'utf8'); - - this.logger.debug('Compiling'); - let compile; let compile_errored = false; + const { emit_event_bus_result, emit_event_bus_stage } = + event_bus === null + ? { + emit_event_bus_result: () => {}, + emit_event_bus_stage: () => {}, + } + : { + emit_event_bus_result: (stage, result, event_bus) => { + const { error, code, signal } = result; + event_bus.emit('exit', stage, { + error, + code, + signal, + }); + }, + emit_event_bus_stage: (stage, event_bus) => { + event_bus.emit('stage', stage); + }, + }; if (this.runtime.compiled) { + this.logger.debug('Compiling'); + emit_event_bus_stage('compile', event_bus); compile = await this.safe_call( this.runtime.compile, code_files.map(x => x.name), this.timeouts.compile, - this.memory_limits.compile + this.memory_limits.compile, + event_bus ); + emit_event_bus_result('compile', compile, event_bus); compile_errored = compile.code !== 0; } let run; if (!compile_errored) { this.logger.debug('Running'); - + emit_event_bus_stage('run', event_bus); run = await this.safe_call( this.runtime.run, [code_files[0].name, ...this.args], this.timeouts.run, - this.memory_limits.run + this.memory_limits.run, + event_bus ); + emit_event_bus_result('run', run, event_bus); } this.state = job_states.EXECUTED; @@ -294,52 +316,6 @@ class Job { }; } - async execute_interactive(event_bus) { - if (this.state !== job_states.PRIMED) { - throw new Error( - 'Job must be in primed state, current state: ' + - this.state.toString() - ); - } - - this.logger.info( - `Interactively executing job runtime=${this.runtime.toString()}` - ); - - const code_files = this.files.filter(file => file.encoding == 'utf8'); - - let compile_errored = false; - if (this.runtime.compiled) { - event_bus.emit('stage', 'compile'); - const { error, code, signal } = await this.safe_call( - this.runtime.compile, - code_files.map(x => x.name), - this.timeouts.compile, - this.memory_limits.compile, - event_bus - ); - - event_bus.emit('exit', 'compile', { error, code, signal }); - compile_errored = code !== 0; - } - - if (!compile_errored) { - this.logger.debug('Running'); - event_bus.emit('stage', 'run'); - const { error, code, signal } = await this.safe_call( - this.runtime.run, - [code_files[0].name, ...this.args], - this.timeouts.run, - this.memory_limits.run, - event_bus - ); - - event_bus.emit('exit', 'run', { error, code, signal }); - } - - this.state = job_states.EXECUTED; - } - cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = [];