From 11841b320265bcd66af8756c16b5c1d29e5b2e84 Mon Sep 17 00:00:00 2001 From: Pablo Pozo Date: Fri, 14 Jul 2023 13:55:40 +0300 Subject: [PATCH] Backport engineer-man#519 parallel requests fix --- api/src/api/v2.js | 29 ++++--- api/src/job.js | 195 +++++++++++++++++++++++++--------------------- 2 files changed, 119 insertions(+), 105 deletions(-) diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 38b7c85..3dfa3ee 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -174,9 +174,9 @@ router.use((req, res, next) => { router.ws('/connect', async (ws, req) => { let job = null; - let eventBus = new events.EventEmitter(); + let event_bus = new events.EventEmitter(); - eventBus.on('stdout', data => + event_bus.on('stdout', data => ws.send( JSON.stringify({ type: 'data', @@ -185,7 +185,7 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stderr', data => + event_bus.on('stderr', data => ws.send( JSON.stringify({ type: 'data', @@ -194,10 +194,10 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stage', stage => + event_bus.on('stage', stage => ws.send(JSON.stringify({ type: 'stage', stage })) ); - eventBus.on('exit', (stage, status) => + event_bus.on('exit', (stage, status) => ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ); @@ -220,7 +220,8 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute_interactive(eventBus); + await job.execute(event_bus); + await job.cleanup(); ws.close(4999, 'Job Completed'); } else { @@ -230,7 +231,7 @@ router.ws('/connect', async (ws, req) => { case 'data': if (job !== null) { if (msg.stream === 'stdin') { - eventBus.emit('stdin', msg.data); + event_bus.emit('stdin', msg.data); } else { ws.close(4004, 'Can only write to stdin'); } @@ -241,7 +242,7 @@ router.ws('/connect', async (ws, req) => { case 'signal': if (job !== null) { if (SIGNALS.includes(msg.signal)) { - eventBus.emit('signal', msg.signal); + event_bus.emit('signal', msg.signal); } else { ws.close(4005, 'Invalid signal'); } @@ -257,12 +258,6 @@ router.ws('/connect', async (ws, req) => { } }); - ws.on('close', async () => { - if (job !== null) { - await job.cleanup(); - } - }); - setTimeout(() => { //Terminate the socket after 1 second, if not initialized. if (job === null) ws.close(4001, 'Initialization Timeout'); @@ -275,7 +270,11 @@ router.post('/execute', async (req, res) => { await job.prime(); - const result = await job.execute(); + let result = await job.execute(); + // Backward compatibility when the run stage is not started + if (result.run === undefined) { + result.run = result.compile; + } await job.cleanup(); diff --git a/api/src/job.js b/api/src/job.js index ecc19e5..f4a8c05 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -19,16 +19,12 @@ let uid = 0; let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; -let jobQueue = []; - -setInterval(() => { - // Every 10ms try resolve a new job, if there is an available slot - if (jobQueue.length > 0 && remaining_job_spaces > 0) { - jobQueue.shift()(); - } -}, 10); +let job_queue = []; class Job { + #active_timeouts; + #active_parent_processes; + constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { this.uuid = uuidv4(); @@ -45,6 +41,13 @@ class Job { this.args = args; this.stdin = stdin; + // Add a trailing newline if it doesn't exist + if (this.stdin.slice(-1) !== '\n') { + this.stdin += '\n'; + } + + this.#active_timeouts = []; + this.#active_parent_processes = []; this.timeouts = timeouts; this.memory_limits = memory_limits; @@ -72,10 +75,9 @@ class Job { if (remaining_job_spaces < 1) { this.logger.info(`Awaiting job slot`); await new Promise(resolve => { - jobQueue.push(resolve); + job_queue.push(resolve); }); } - this.logger.info(`Priming job`); remaining_job_spaces--; this.logger.debug('Writing files to job cache'); @@ -110,7 +112,31 @@ class Job { this.logger.debug('Primed job'); } - async safe_call(file, args, timeout, memory_limit, eventBus = null) { + exit_cleanup() { + for (const timeout of this.#active_timeouts) { + clear_timeout(timeout); + } + this.#active_timeouts = []; + this.logger.debug('Cleared the active timeouts'); + + this.cleanup_processes(); + this.logger.debug(`Finished exit cleanup`); + } + + close_cleanup() { + for (const proc of this.#active_parent_processes) { + proc.stderr.destroy(); + if (!proc.stdin.destroyed) { + proc.stdin.end(); + proc.stdin.destroy(); + } + proc.stdout.destroy(); + } + this.#active_parent_processes = []; + this.logger.debug('Destroyed processes writables'); + } + + async safe_call(file, args, timeout, memory_limit, event_bus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -122,7 +148,10 @@ class Job { ]; const timeout_call = [ - 'timeout', '-s', '9', Math.ceil(timeout / 1000), + 'timeout', + '-s', + '9', + Math.ceil(timeout / 1000), ]; if (memory_limit >= 0) { @@ -155,16 +184,18 @@ class Job { detached: true, //give this process its own process group }); - if (eventBus === null) { + this.#active_parent_processes.push(proc); + + if (event_bus === null) { proc.stdin.write(this.stdin); proc.stdin.end(); proc.stdin.destroy(); } else { - eventBus.on('stdin', data => { + event_bus.on('stdin', data => { proc.stdin.write(data); }); - eventBus.on('kill', signal => { + event_bus.on('kill', signal => { proc.kill(signal); }); } @@ -176,10 +207,11 @@ class Job { process.kill(proc.pid, 'SIGKILL'); }, timeout)) || null; + this.#active_timeouts.push(kill_timeout); proc.stderr.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stderr', data); + if (event_bus !== null) { + event_bus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -190,8 +222,8 @@ class Job { }); proc.stdout.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stdout', data); + if (event_bus !== null) { + event_bus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -201,31 +233,24 @@ class Job { } }); - const exit_cleanup = () => { - clear_timeout(kill_timeout); + proc.on('exit', () => this.exit_cleanup()); - proc.stderr.destroy(); - proc.stdout.destroy(); - - this.cleanup_processes(); - this.logger.debug(`Finished exit cleanup`); - }; - - proc.on('exit', (code, signal) => { - exit_cleanup(); + proc.on('close', (code, signal) => { + this.close_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); proc.on('error', err => { - exit_cleanup(); + this.exit_cleanup(); + this.close_cleanup(); reject({ error: err, stdout, stderr, output }); }); }); } - async execute() { + async execute(event_bus = null) { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -242,24 +267,54 @@ class Job { this.logger.debug('Compiling'); let compile; + let compile_errored = false; + const { emit_event_bus_result, emit_event_bus_stage } = + event_bus === null + ? { + emit_event_bus_result: () => {}, + emit_event_bus_stage: () => {}, + } + : { + emit_event_bus_result: (stage, result, event_bus) => { + const { error, code, signal } = result; + event_bus.emit('exit', stage, { + error, + code, + signal, + }); + }, + emit_event_bus_stage: (stage, event_bus) => { + event_bus.emit('stage', stage); + }, + }; if (this.runtime.compiled) { + this.logger.debug('Compiling'); + emit_event_bus_stage('compile', event_bus); compile = await this.safe_call( path.join(this.runtime.pkgdir, 'compile'), code_files.map(x => x.name), this.timeouts.compile, - this.memory_limits.compile + this.memory_limits.compile, + event_bus ); + emit_event_bus_result('compile', compile, event_bus); + compile_errored = compile.code !== 0; } - this.logger.debug('Running'); - - const run = await this.safe_call( - path.join(this.runtime.pkgdir, 'run'), - [code_files[0].name, ...this.args], - this.timeouts.run, - this.memory_limits.run - ); + let run; + if (!compile_errored) { + this.logger.debug('Running'); + emit_event_bus_stage('run', event_bus); + run = await this.safe_call( + path.join(this.runtime.pkgdir, 'run'), + [code_files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run, + event_bus + ); + emit_event_bus_result('run', run, event_bus); + } this.state = job_states.EXECUTED; @@ -271,50 +326,6 @@ class Job { }; } - async execute_interactive(eventBus) { - if (this.state !== job_states.PRIMED) { - throw new Error( - 'Job must be in primed state, current state: ' + - this.state.toString() - ); - } - - this.logger.info( - `Interactively executing job runtime=${this.runtime.toString()}` - ); - - const code_files = - (this.runtime.language === 'file' && this.files) || - this.files.filter(file => file.encoding == 'utf8'); - - if (this.runtime.compiled) { - eventBus.emit('stage', 'compile'); - const { error, code, signal } = await this.safe_call( - path.join(this.runtime.pkgdir, 'compile'), - code_files.map(x => x.name), - this.timeouts.compile, - this.memory_limits.compile, - eventBus - ); - - eventBus.emit('exit', 'compile', { error, code, signal }); - } - - this.logger.debug('Running'); - eventBus.emit('stage', 'run'); - const { error, code, signal } = await this.safe_call( - path.join(this.runtime.pkgdir, 'run'), - [code_files[0].name, ...this.args], - this.timeouts.run, - this.memory_limits.run, - eventBus - ); - - eventBus.emit('exit', 'run', { error, code, signal }); - - this.state = job_states.EXECUTED; - } - cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = []; @@ -345,11 +356,11 @@ class Job { const proc_id_int = parse_int(proc_id); // Skip over any processes that aren't ours. - if(ruid != this.uid && euid != this.uid) return -1; + if (ruid != this.uid && euid != this.uid) return -1; - if (state == 'Z'){ + if (state == 'Z') { // Zombie process, just needs to be waited, regardless of the user id - if(!to_wait.includes(proc_id_int)) + if (!to_wait.includes(proc_id_int)) to_wait.push(proc_id_int); return -1; @@ -386,7 +397,7 @@ class Job { // Then clear them out of the process tree try { process.kill(proc, 'SIGKILL'); - } catch(e) { + } catch { // Could already be dead and just needs to be waited on this.logger.debug( `Got error while SIGKILLing process ${proc}:`, @@ -440,10 +451,14 @@ class Job { async cleanup() { this.logger.info(`Cleaning up job`); - this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow + this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow + this.close_cleanup(); await this.cleanup_filesystem(); remaining_job_spaces++; + if (job_queue.length > 0) { + job_queue.shift()(); + } } }