diff --git a/api/src/api/v2.js b/api/src/api/v2.js index c5ba659..9cb5c36 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -172,9 +172,9 @@ router.use((req, res, next) => { router.ws('/connect', async (ws, req) => { let job = null; - let event_bus = new events.EventEmitter(); + let eventBus = new events.EventEmitter(); - event_bus.on('stdout', data => + eventBus.on('stdout', data => ws.send( JSON.stringify({ type: 'data', @@ -183,7 +183,7 @@ router.ws('/connect', async (ws, req) => { }) ) ); - event_bus.on('stderr', data => + eventBus.on('stderr', data => ws.send( JSON.stringify({ type: 'data', @@ -192,10 +192,10 @@ router.ws('/connect', async (ws, req) => { }) ) ); - event_bus.on('stage', stage => + eventBus.on('stage', stage => ws.send(JSON.stringify({ type: 'stage', stage })) ); - event_bus.on('exit', (stage, status) => + eventBus.on('exit', (stage, status) => ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ); @@ -218,8 +218,7 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute(event_bus); - await job.cleanup(); + await job.execute_interactive(eventBus); ws.close(4999, 'Job Completed'); } else { @@ -229,7 +228,7 @@ router.ws('/connect', async (ws, req) => { case 'data': if (job !== null) { if (msg.stream === 'stdin') { - event_bus.emit('stdin', msg.data); + eventBus.emit('stdin', msg.data); } else { ws.close(4004, 'Can only write to stdin'); } @@ -240,7 +239,7 @@ router.ws('/connect', async (ws, req) => { case 'signal': if (job !== null) { if (SIGNALS.includes(msg.signal)) { - event_bus.emit('signal', msg.signal); + eventBus.emit('signal', msg.signal); } else { ws.close(4005, 'Invalid signal'); } @@ -256,6 +255,12 @@ router.ws('/connect', async (ws, req) => { } }); + ws.on('close', async () => { + if (job !== null) { + await job.cleanup(); + } + }); + setTimeout(() => { //Terminate the socket after 1 second, if not initialized. if (job === null) ws.close(4001, 'Initialization Timeout'); diff --git a/api/src/job.js b/api/src/job.js index 7f80d6b..d7e36b3 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -18,7 +18,14 @@ let uid = 0; let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; -let job_queue = []; +let jobQueue = []; + +setInterval(() => { + // Every 10ms try resolve a new job, if there is an available slot + if (jobQueue.length > 0 && remaining_job_spaces > 0) { + jobQueue.shift()(); + } +}, 10); class Job { #active_timeouts; @@ -74,9 +81,10 @@ class Job { if (remaining_job_spaces < 1) { this.logger.info(`Awaiting job slot`); await new Promise(resolve => { - job_queue.push(resolve); + jobQueue.push(resolve); }); } + this.logger.info(`Priming job`); remaining_job_spaces--; this.logger.debug('Writing files to job cache'); @@ -118,11 +126,6 @@ class Job { this.#active_timeouts = []; this.logger.debug('Cleared the active timeouts'); - this.cleanup_processes(); - this.logger.debug(`Finished exit cleanup`); - } - - close_cleanup() { for (const proc of this.#active_parent_processes) { proc.stderr.destroy(); if (!proc.stdin.destroyed) { @@ -132,10 +135,13 @@ class Job { proc.stdout.destroy(); } this.#active_parent_processes = []; - this.logger.debug('Destroyed processes writables'); + this.logger.debug('Destroyed parent processes writables'); + + this.cleanup_processes(); + this.logger.debug(`Finished exit cleanup`); } - async safe_call(file, args, timeout, memory_limit, event_bus = null) { + async safe_call(file, args, timeout, memory_limit, eventBus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -147,10 +153,7 @@ class Job { ]; const timeout_call = [ - 'timeout', - '-s', - '9', - Math.ceil(timeout / 1000), + 'timeout', '-s', '9', Math.ceil(timeout / 1000), ]; if (memory_limit >= 0) { @@ -181,16 +184,16 @@ class Job { this.#active_parent_processes.push(proc); - if (event_bus === null) { + if (eventBus === null) { proc.stdin.write(this.stdin); proc.stdin.end(); proc.stdin.destroy(); } else { - event_bus.on('stdin', data => { + eventBus.on('stdin', data => { proc.stdin.write(data); }); - event_bus.on('kill', signal => { + eventBus.on('kill', signal => { proc.kill(signal); }); } @@ -205,8 +208,8 @@ class Job { this.#active_timeouts.push(kill_timeout); proc.stderr.on('data', async data => { - if (event_bus !== null) { - event_bus.emit('stderr', data); + if (eventBus !== null) { + eventBus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -217,8 +220,8 @@ class Job { }); proc.stdout.on('data', async data => { - if (event_bus !== null) { - event_bus.emit('stdout', data); + if (eventBus !== null) { + eventBus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -228,24 +231,21 @@ class Job { } }); - proc.on('exit', () => this.exit_cleanup()); - - proc.on('close', (code, signal) => { - this.close_cleanup(); + proc.on('exit', (code, signal) => { + this.exit_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); proc.on('error', err => { this.exit_cleanup(); - this.close_cleanup(); reject({ error: err, stdout, stderr, output }); }); }); } - async execute(event_bus = null) { + async execute() { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -256,54 +256,32 @@ class Job { this.logger.info(`Executing job runtime=${this.runtime.toString()}`); const code_files = this.files.filter(file => file.encoding == 'utf8'); + + this.logger.debug('Compiling'); + let compile; let compile_errored = false; - const { emit_event_bus_result, emit_event_bus_stage } = - event_bus === null - ? { - emit_event_bus_result: () => {}, - emit_event_bus_stage: () => {}, - } - : { - emit_event_bus_result: (stage, result, event_bus) => { - const { error, code, signal } = result; - event_bus.emit('exit', stage, { - error, - code, - signal, - }); - }, - emit_event_bus_stage: (stage, event_bus) => { - event_bus.emit('stage', stage); - }, - }; if (this.runtime.compiled) { - this.logger.debug('Compiling'); - emit_event_bus_stage('compile', event_bus); compile = await this.safe_call( this.runtime.compile, code_files.map(x => x.name), this.timeouts.compile, - this.memory_limits.compile, - event_bus + this.memory_limits.compile ); - emit_event_bus_result('compile', compile, event_bus); compile_errored = compile.code !== 0; } let run; if (!compile_errored) { this.logger.debug('Running'); - emit_event_bus_stage('run', event_bus); + run = await this.safe_call( this.runtime.run, [code_files[0].name, ...this.args], this.timeouts.run, - this.memory_limits.run, - event_bus + this.memory_limits.run ); - emit_event_bus_result('run', run, event_bus); } this.state = job_states.EXECUTED; @@ -316,6 +294,52 @@ class Job { }; } + async execute_interactive(eventBus) { + if (this.state !== job_states.PRIMED) { + throw new Error( + 'Job must be in primed state, current state: ' + + this.state.toString() + ); + } + + this.logger.info( + `Interactively executing job runtime=${this.runtime.toString()}` + ); + + const code_files = this.files.filter(file => file.encoding == 'utf8'); + + let compile_errored = false; + if (this.runtime.compiled) { + eventBus.emit('stage', 'compile'); + const { error, code, signal } = await this.safe_call( + this.runtime.compile, + code_files.map(x => x.name), + this.timeouts.compile, + this.memory_limits.compile, + eventBus + ); + + eventBus.emit('exit', 'compile', { error, code, signal }); + compile_errored = code !== 0; + } + + if (!compile_errored) { + this.logger.debug('Running'); + eventBus.emit('stage', 'run'); + const { error, code, signal } = await this.safe_call( + this.runtime.run, + [code_files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run, + eventBus + ); + + eventBus.emit('exit', 'run', { error, code, signal }); + } + + this.state = job_states.EXECUTED; + } + cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = []; @@ -346,11 +370,11 @@ class Job { const proc_id_int = parse_int(proc_id); // Skip over any processes that aren't ours. - if (ruid != this.uid && euid != this.uid) return -1; + if(ruid != this.uid && euid != this.uid) return -1; - if (state == 'Z') { + if (state == 'Z'){ // Zombie process, just needs to be waited, regardless of the user id - if (!to_wait.includes(proc_id_int)) + if(!to_wait.includes(proc_id_int)) to_wait.push(proc_id_int); return -1; @@ -442,13 +466,9 @@ class Job { this.logger.info(`Cleaning up job`); this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow - this.close_cleanup(); await this.cleanup_filesystem(); remaining_job_spaces++; - if (job_queue.length > 0) { - job_queue.shift()(); - } } }