diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 9cb5c36..c5ba659 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -172,9 +172,9 @@ router.use((req, res, next) => { router.ws('/connect', async (ws, req) => { let job = null; - let eventBus = new events.EventEmitter(); + let event_bus = new events.EventEmitter(); - eventBus.on('stdout', data => + event_bus.on('stdout', data => ws.send( JSON.stringify({ type: 'data', @@ -183,7 +183,7 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stderr', data => + event_bus.on('stderr', data => ws.send( JSON.stringify({ type: 'data', @@ -192,10 +192,10 @@ router.ws('/connect', async (ws, req) => { }) ) ); - eventBus.on('stage', stage => + event_bus.on('stage', stage => ws.send(JSON.stringify({ type: 'stage', stage })) ); - eventBus.on('exit', (stage, status) => + event_bus.on('exit', (stage, status) => ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ); @@ -218,7 +218,8 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute_interactive(eventBus); + await job.execute(event_bus); + await job.cleanup(); ws.close(4999, 'Job Completed'); } else { @@ -228,7 +229,7 @@ router.ws('/connect', async (ws, req) => { case 'data': if (job !== null) { if (msg.stream === 'stdin') { - eventBus.emit('stdin', msg.data); + event_bus.emit('stdin', msg.data); } else { ws.close(4004, 'Can only write to stdin'); } @@ -239,7 +240,7 @@ router.ws('/connect', async (ws, req) => { case 'signal': if (job !== null) { if (SIGNALS.includes(msg.signal)) { - eventBus.emit('signal', msg.signal); + event_bus.emit('signal', msg.signal); } else { ws.close(4005, 'Invalid signal'); } @@ -255,12 +256,6 @@ router.ws('/connect', async (ws, req) => { } }); - ws.on('close', async () => { - if (job !== null) { - await job.cleanup(); - } - }); - setTimeout(() => { //Terminate the socket after 1 second, if not initialized. if (job === null) ws.close(4001, 'Initialization Timeout'); diff --git a/api/src/job.js b/api/src/job.js index d7e36b3..7f80d6b 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -18,14 +18,7 @@ let uid = 0; let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; -let jobQueue = []; - -setInterval(() => { - // Every 10ms try resolve a new job, if there is an available slot - if (jobQueue.length > 0 && remaining_job_spaces > 0) { - jobQueue.shift()(); - } -}, 10); +let job_queue = []; class Job { #active_timeouts; @@ -81,10 +74,9 @@ class Job { if (remaining_job_spaces < 1) { this.logger.info(`Awaiting job slot`); await new Promise(resolve => { - jobQueue.push(resolve); + job_queue.push(resolve); }); } - this.logger.info(`Priming job`); remaining_job_spaces--; this.logger.debug('Writing files to job cache'); @@ -126,6 +118,11 @@ class Job { this.#active_timeouts = []; this.logger.debug('Cleared the active timeouts'); + this.cleanup_processes(); + this.logger.debug(`Finished exit cleanup`); + } + + close_cleanup() { for (const proc of this.#active_parent_processes) { proc.stderr.destroy(); if (!proc.stdin.destroyed) { @@ -135,13 +132,10 @@ class Job { proc.stdout.destroy(); } this.#active_parent_processes = []; - this.logger.debug('Destroyed parent processes writables'); - - this.cleanup_processes(); - this.logger.debug(`Finished exit cleanup`); + this.logger.debug('Destroyed processes writables'); } - async safe_call(file, args, timeout, memory_limit, eventBus = null) { + async safe_call(file, args, timeout, memory_limit, event_bus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -153,7 +147,10 @@ class Job { ]; const timeout_call = [ - 'timeout', '-s', '9', Math.ceil(timeout / 1000), + 'timeout', + '-s', + '9', + Math.ceil(timeout / 1000), ]; if (memory_limit >= 0) { @@ -184,16 +181,16 @@ class Job { this.#active_parent_processes.push(proc); - if (eventBus === null) { + if (event_bus === null) { proc.stdin.write(this.stdin); proc.stdin.end(); proc.stdin.destroy(); } else { - eventBus.on('stdin', data => { + event_bus.on('stdin', data => { proc.stdin.write(data); }); - eventBus.on('kill', signal => { + event_bus.on('kill', signal => { proc.kill(signal); }); } @@ -208,8 +205,8 @@ class Job { this.#active_timeouts.push(kill_timeout); proc.stderr.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stderr', data); + if (event_bus !== null) { + event_bus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -220,8 +217,8 @@ class Job { }); proc.stdout.on('data', async data => { - if (eventBus !== null) { - eventBus.emit('stdout', data); + if (event_bus !== null) { + event_bus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -231,21 +228,24 @@ class Job { } }); - proc.on('exit', (code, signal) => { - this.exit_cleanup(); + proc.on('exit', () => this.exit_cleanup()); + + proc.on('close', (code, signal) => { + this.close_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); proc.on('error', err => { this.exit_cleanup(); + this.close_cleanup(); reject({ error: err, stdout, stderr, output }); }); }); } - async execute() { + async execute(event_bus = null) { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -256,32 +256,54 @@ class Job { this.logger.info(`Executing job runtime=${this.runtime.toString()}`); const code_files = this.files.filter(file => file.encoding == 'utf8'); - - this.logger.debug('Compiling'); - let compile; let compile_errored = false; + const { emit_event_bus_result, emit_event_bus_stage } = + event_bus === null + ? { + emit_event_bus_result: () => {}, + emit_event_bus_stage: () => {}, + } + : { + emit_event_bus_result: (stage, result, event_bus) => { + const { error, code, signal } = result; + event_bus.emit('exit', stage, { + error, + code, + signal, + }); + }, + emit_event_bus_stage: (stage, event_bus) => { + event_bus.emit('stage', stage); + }, + }; if (this.runtime.compiled) { + this.logger.debug('Compiling'); + emit_event_bus_stage('compile', event_bus); compile = await this.safe_call( this.runtime.compile, code_files.map(x => x.name), this.timeouts.compile, - this.memory_limits.compile + this.memory_limits.compile, + event_bus ); + emit_event_bus_result('compile', compile, event_bus); compile_errored = compile.code !== 0; } let run; if (!compile_errored) { this.logger.debug('Running'); - + emit_event_bus_stage('run', event_bus); run = await this.safe_call( this.runtime.run, [code_files[0].name, ...this.args], this.timeouts.run, - this.memory_limits.run + this.memory_limits.run, + event_bus ); + emit_event_bus_result('run', run, event_bus); } this.state = job_states.EXECUTED; @@ -294,52 +316,6 @@ class Job { }; } - async execute_interactive(eventBus) { - if (this.state !== job_states.PRIMED) { - throw new Error( - 'Job must be in primed state, current state: ' + - this.state.toString() - ); - } - - this.logger.info( - `Interactively executing job runtime=${this.runtime.toString()}` - ); - - const code_files = this.files.filter(file => file.encoding == 'utf8'); - - let compile_errored = false; - if (this.runtime.compiled) { - eventBus.emit('stage', 'compile'); - const { error, code, signal } = await this.safe_call( - this.runtime.compile, - code_files.map(x => x.name), - this.timeouts.compile, - this.memory_limits.compile, - eventBus - ); - - eventBus.emit('exit', 'compile', { error, code, signal }); - compile_errored = code !== 0; - } - - if (!compile_errored) { - this.logger.debug('Running'); - eventBus.emit('stage', 'run'); - const { error, code, signal } = await this.safe_call( - this.runtime.run, - [code_files[0].name, ...this.args], - this.timeouts.run, - this.memory_limits.run, - eventBus - ); - - eventBus.emit('exit', 'run', { error, code, signal }); - } - - this.state = job_states.EXECUTED; - } - cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = []; @@ -370,11 +346,11 @@ class Job { const proc_id_int = parse_int(proc_id); // Skip over any processes that aren't ours. - if(ruid != this.uid && euid != this.uid) return -1; + if (ruid != this.uid && euid != this.uid) return -1; - if (state == 'Z'){ + if (state == 'Z') { // Zombie process, just needs to be waited, regardless of the user id - if(!to_wait.includes(proc_id_int)) + if (!to_wait.includes(proc_id_int)) to_wait.push(proc_id_int); return -1; @@ -466,9 +442,13 @@ class Job { this.logger.info(`Cleaning up job`); this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow + this.close_cleanup(); await this.cleanup_filesystem(); remaining_job_spaces++; + if (job_queue.length > 0) { + job_queue.shift()(); + } } }