diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 3dfa3ee..38b7c85 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -174,9 +174,9 @@ router.use((req, res, next) => { router.ws('/connect', async (ws, req) => { let job = null; - let event_bus = new events.EventEmitter(); + let eventBus = new events.EventEmitter(); - event_bus.on('stdout', data => + eventBus.on('stdout', data => ws.send( JSON.stringify({ type: 'data', @@ -185,7 +185,7 @@ router.ws('/connect', async (ws, req) => { }) ) ); - event_bus.on('stderr', data => + eventBus.on('stderr', data => ws.send( JSON.stringify({ type: 'data', @@ -194,10 +194,10 @@ router.ws('/connect', async (ws, req) => { }) ) ); - event_bus.on('stage', stage => + eventBus.on('stage', stage => ws.send(JSON.stringify({ type: 'stage', stage })) ); - event_bus.on('exit', (stage, status) => + eventBus.on('exit', (stage, status) => ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ); @@ -220,8 +220,7 @@ router.ws('/connect', async (ws, req) => { }) ); - await job.execute(event_bus); - await job.cleanup(); + await job.execute_interactive(eventBus); ws.close(4999, 'Job Completed'); } else { @@ -231,7 +230,7 @@ router.ws('/connect', async (ws, req) => { case 'data': if (job !== null) { if (msg.stream === 'stdin') { - event_bus.emit('stdin', msg.data); + eventBus.emit('stdin', msg.data); } else { ws.close(4004, 'Can only write to stdin'); } @@ -242,7 +241,7 @@ router.ws('/connect', async (ws, req) => { case 'signal': if (job !== null) { if (SIGNALS.includes(msg.signal)) { - event_bus.emit('signal', msg.signal); + eventBus.emit('signal', msg.signal); } else { ws.close(4005, 'Invalid signal'); } @@ -258,6 +257,12 @@ router.ws('/connect', async (ws, req) => { } }); + ws.on('close', async () => { + if (job !== null) { + await job.cleanup(); + } + }); + setTimeout(() => { //Terminate the socket after 1 second, if not initialized. if (job === null) ws.close(4001, 'Initialization Timeout'); @@ -270,11 +275,7 @@ router.post('/execute', async (req, res) => { await job.prime(); - let result = await job.execute(); - // Backward compatibility when the run stage is not started - if (result.run === undefined) { - result.run = result.compile; - } + const result = await job.execute(); await job.cleanup(); diff --git a/api/src/job.js b/api/src/job.js index f4a8c05..ecc19e5 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -19,12 +19,16 @@ let uid = 0; let gid = 0; let remaining_job_spaces = config.max_concurrent_jobs; -let job_queue = []; +let jobQueue = []; + +setInterval(() => { + // Every 10ms try resolve a new job, if there is an available slot + if (jobQueue.length > 0 && remaining_job_spaces > 0) { + jobQueue.shift()(); + } +}, 10); class Job { - #active_timeouts; - #active_parent_processes; - constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { this.uuid = uuidv4(); @@ -41,13 +45,6 @@ class Job { this.args = args; this.stdin = stdin; - // Add a trailing newline if it doesn't exist - if (this.stdin.slice(-1) !== '\n') { - this.stdin += '\n'; - } - - this.#active_timeouts = []; - this.#active_parent_processes = []; this.timeouts = timeouts; this.memory_limits = memory_limits; @@ -75,9 +72,10 @@ class Job { if (remaining_job_spaces < 1) { this.logger.info(`Awaiting job slot`); await new Promise(resolve => { - job_queue.push(resolve); + jobQueue.push(resolve); }); } + this.logger.info(`Priming job`); remaining_job_spaces--; this.logger.debug('Writing files to job cache'); @@ -112,31 +110,7 @@ class Job { this.logger.debug('Primed job'); } - exit_cleanup() { - for (const timeout of this.#active_timeouts) { - clear_timeout(timeout); - } - this.#active_timeouts = []; - this.logger.debug('Cleared the active timeouts'); - - this.cleanup_processes(); - this.logger.debug(`Finished exit cleanup`); - } - - close_cleanup() { - for (const proc of this.#active_parent_processes) { - proc.stderr.destroy(); - if (!proc.stdin.destroyed) { - proc.stdin.end(); - proc.stdin.destroy(); - } - proc.stdout.destroy(); - } - this.#active_parent_processes = []; - this.logger.debug('Destroyed processes writables'); - } - - async safe_call(file, args, timeout, memory_limit, event_bus = null) { + async safe_call(file, args, timeout, memory_limit, eventBus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -148,10 +122,7 @@ class Job { ]; const timeout_call = [ - 'timeout', - '-s', - '9', - Math.ceil(timeout / 1000), + 'timeout', '-s', '9', Math.ceil(timeout / 1000), ]; if (memory_limit >= 0) { @@ -184,18 +155,16 @@ class Job { detached: true, //give this process its own process group }); - this.#active_parent_processes.push(proc); - - if (event_bus === null) { + if (eventBus === null) { proc.stdin.write(this.stdin); proc.stdin.end(); proc.stdin.destroy(); } else { - event_bus.on('stdin', data => { + eventBus.on('stdin', data => { proc.stdin.write(data); }); - event_bus.on('kill', signal => { + eventBus.on('kill', signal => { proc.kill(signal); }); } @@ -207,11 +176,10 @@ class Job { process.kill(proc.pid, 'SIGKILL'); }, timeout)) || null; - this.#active_timeouts.push(kill_timeout); proc.stderr.on('data', async data => { - if (event_bus !== null) { - event_bus.emit('stderr', data); + if (eventBus !== null) { + eventBus.emit('stderr', data); } else if (stderr.length > this.runtime.output_max_size) { this.logger.info(`stderr length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -222,8 +190,8 @@ class Job { }); proc.stdout.on('data', async data => { - if (event_bus !== null) { - event_bus.emit('stdout', data); + if (eventBus !== null) { + eventBus.emit('stdout', data); } else if (stdout.length > this.runtime.output_max_size) { this.logger.info(`stdout length exceeded`); process.kill(proc.pid, 'SIGKILL'); @@ -233,24 +201,31 @@ class Job { } }); - proc.on('exit', () => this.exit_cleanup()); + const exit_cleanup = () => { + clear_timeout(kill_timeout); - proc.on('close', (code, signal) => { - this.close_cleanup(); + proc.stderr.destroy(); + proc.stdout.destroy(); + + this.cleanup_processes(); + this.logger.debug(`Finished exit cleanup`); + }; + + proc.on('exit', (code, signal) => { + exit_cleanup(); resolve({ stdout, stderr, code, signal, output }); }); proc.on('error', err => { - this.exit_cleanup(); - this.close_cleanup(); + exit_cleanup(); reject({ error: err, stdout, stderr, output }); }); }); } - async execute(event_bus = null) { + async execute() { if (this.state !== job_states.PRIMED) { throw new Error( 'Job must be in primed state, current state: ' + @@ -267,54 +242,24 @@ class Job { this.logger.debug('Compiling'); let compile; - let compile_errored = false; - const { emit_event_bus_result, emit_event_bus_stage } = - event_bus === null - ? { - emit_event_bus_result: () => {}, - emit_event_bus_stage: () => {}, - } - : { - emit_event_bus_result: (stage, result, event_bus) => { - const { error, code, signal } = result; - event_bus.emit('exit', stage, { - error, - code, - signal, - }); - }, - emit_event_bus_stage: (stage, event_bus) => { - event_bus.emit('stage', stage); - }, - }; if (this.runtime.compiled) { - this.logger.debug('Compiling'); - emit_event_bus_stage('compile', event_bus); compile = await this.safe_call( path.join(this.runtime.pkgdir, 'compile'), code_files.map(x => x.name), this.timeouts.compile, - this.memory_limits.compile, - event_bus + this.memory_limits.compile ); - emit_event_bus_result('compile', compile, event_bus); - compile_errored = compile.code !== 0; } - let run; - if (!compile_errored) { - this.logger.debug('Running'); - emit_event_bus_stage('run', event_bus); - run = await this.safe_call( - path.join(this.runtime.pkgdir, 'run'), - [code_files[0].name, ...this.args], - this.timeouts.run, - this.memory_limits.run, - event_bus - ); - emit_event_bus_result('run', run, event_bus); - } + this.logger.debug('Running'); + + const run = await this.safe_call( + path.join(this.runtime.pkgdir, 'run'), + [code_files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run + ); this.state = job_states.EXECUTED; @@ -326,6 +271,50 @@ class Job { }; } + async execute_interactive(eventBus) { + if (this.state !== job_states.PRIMED) { + throw new Error( + 'Job must be in primed state, current state: ' + + this.state.toString() + ); + } + + this.logger.info( + `Interactively executing job runtime=${this.runtime.toString()}` + ); + + const code_files = + (this.runtime.language === 'file' && this.files) || + this.files.filter(file => file.encoding == 'utf8'); + + if (this.runtime.compiled) { + eventBus.emit('stage', 'compile'); + const { error, code, signal } = await this.safe_call( + path.join(this.runtime.pkgdir, 'compile'), + code_files.map(x => x.name), + this.timeouts.compile, + this.memory_limits.compile, + eventBus + ); + + eventBus.emit('exit', 'compile', { error, code, signal }); + } + + this.logger.debug('Running'); + eventBus.emit('stage', 'run'); + const { error, code, signal } = await this.safe_call( + path.join(this.runtime.pkgdir, 'run'), + [code_files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run, + eventBus + ); + + eventBus.emit('exit', 'run', { error, code, signal }); + + this.state = job_states.EXECUTED; + } + cleanup_processes(dont_wait = []) { let processes = [1]; const to_wait = []; @@ -356,11 +345,11 @@ class Job { const proc_id_int = parse_int(proc_id); // Skip over any processes that aren't ours. - if (ruid != this.uid && euid != this.uid) return -1; + if(ruid != this.uid && euid != this.uid) return -1; - if (state == 'Z') { + if (state == 'Z'){ // Zombie process, just needs to be waited, regardless of the user id - if (!to_wait.includes(proc_id_int)) + if(!to_wait.includes(proc_id_int)) to_wait.push(proc_id_int); return -1; @@ -397,7 +386,7 @@ class Job { // Then clear them out of the process tree try { process.kill(proc, 'SIGKILL'); - } catch { + } catch(e) { // Could already be dead and just needs to be waited on this.logger.debug( `Got error while SIGKILLing process ${proc}:`, @@ -451,14 +440,10 @@ class Job { async cleanup() { this.logger.info(`Cleaning up job`); - this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow - this.close_cleanup(); + this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow await this.cleanup_filesystem(); remaining_job_spaces++; - if (job_queue.length > 0) { - job_queue.shift()(); - } } }