From de89acb617470beffff6be2a3343302a51183ce7 Mon Sep 17 00:00:00 2001 From: Thomas Hobson Date: Fri, 16 Jul 2021 23:10:44 +1200 Subject: [PATCH] api: Implement Websocket transport for live data --- api/package-lock.json | 49 +++++++ api/package.json | 1 + api/src/api/v2.js | 299 +++++++++++++++++++++++++++--------------- api/src/index.js | 4 + api/src/job.js | 59 ++++++++- 5 files changed, 299 insertions(+), 113 deletions(-) diff --git a/api/package-lock.json b/api/package-lock.json index c46ae87..83df240 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -12,6 +12,7 @@ "body-parser": "^1.19.0", "chownr": "^2.0.0", "express": "^4.17.1", + "express-ws": "^5.0.2", "is-docker": "^2.1.1", "logplease": "^1.2.15", "nocamel": "HexF/nocamel#patch-1", @@ -196,6 +197,20 @@ "node": ">= 0.10.0" } }, + "node_modules/express-ws": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz", + "integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==", + "dependencies": { + "ws": "^7.4.6" + }, + "engines": { + "node": ">=4.5.0" + }, + "peerDependencies": { + "express": "^4.0.0 || ^5.0.0-alpha.1" + } + }, "node_modules/finalhandler": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz", @@ -582,6 +597,26 @@ "node_modules/waitpid": { "resolved": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa" }, + "node_modules/ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/yallist": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", @@ -728,6 +763,14 @@ "vary": "~1.1.2" } }, + "express-ws": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz", + "integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==", + "requires": { + "ws": "^7.4.6" + } + }, "finalhandler": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz", @@ -1010,6 +1053,12 @@ "version": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa", "from": "waitpid@git+https://github.com/HexF/node-waitpid.git" }, + "ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "requires": {} + }, "yallist": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", diff --git a/api/package.json b/api/package.json index ab34063..0c32198 100644 --- a/api/package.json +++ b/api/package.json @@ -7,6 +7,7 @@ "body-parser": "^1.19.0", "chownr": "^2.0.0", "express": "^4.17.1", + "express-ws": "^5.0.2", "is-docker": "^2.1.1", "logplease": "^1.2.15", "nocamel": "HexF/nocamel#patch-1", diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 948dccf..487b10b 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -1,12 +1,124 @@ const express = require('express'); const router = express.Router(); +const events = require('events'); + const config = require('../config'); const runtime = require('../runtime'); const { Job } = require('../job'); const package = require('../package'); const logger = require('logplease').create('api/v2'); + +function get_job(body){ + const { + language, + version, + args, + stdin, + files, + compile_memory_limit, + run_memory_limit, + run_timeout, + compile_timeout + } = body; + + return new Promise((resolve, reject) => { + if (!language || typeof language !== 'string') { + return reject({ + message: 'language is required as a string', + }); + } + + if (!version || typeof version !== 'string') { + return reject({ + message: 'version is required as a string', + }); + } + + if (!files || !Array.isArray(files)) { + return reject({ + message: 'files is required as an array', + }); + } + + for (const [i, file] of files.entries()) { + if (typeof file.content !== 'string') { + return reject({ + message: `files[${i}].content is required as a string`, + }); + } + } + + if (compile_memory_limit) { + if (typeof compile_memory_limit !== 'number') { + return reject({ + message: 'if specified, compile_memory_limit must be a number', + }); + } + + if ( + config.compile_memory_limit >= 0 && + (compile_memory_limit > config.compile_memory_limit || + compile_memory_limit < 0) + ) { + return reject({ + message: + 'compile_memory_limit cannot exceed the configured limit of ' + + config.compile_memory_limit, + }); + } + } + + if (run_memory_limit) { + if (typeof run_memory_limit !== 'number') { + return reject({ + message: 'if specified, run_memory_limit must be a number', + }); + } + + if ( + config.run_memory_limit >= 0 && + (run_memory_limit > config.run_memory_limit || run_memory_limit < 0) + ) { + return reject({ + message: + 'run_memory_limit cannot exceed the configured limit of ' + + config.run_memory_limit, + }); + } + } + + const rt = runtime.get_latest_runtime_matching_language_version( + language, + version + ); + + if (rt === undefined) { + return reject({ + message: `${language}-${version} runtime is unknown`, + }); + } + + resolve(new Job({ + runtime: rt, + alias: language, + args: args || [], + stdin: '', + files, + timeouts: { + run: run_timeout || 3000, + compile: compile_timeout || 10000, + }, + memory_limits: { + run: run_memory_limit || config.run_memory_limit, + compile: compile_memory_limit || config.compile_memory_limit, + } + })); + }) + +} + router.use((req, res, next) => { if (['GET', 'HEAD', 'OPTIONS'].includes(req.method)) { return next(); @@ -21,118 +133,87 @@ router.use((req, res, next) => { next(); }); +router.ws('/connect', async (ws, req) => { + + let job = null; + let eventBus = new events.EventEmitter(); + + eventBus.on("stdout", (data) => ws.send(JSON.stringify({type: "data", stream: "stdout", data: data.toString()}))) + eventBus.on("stderr", (data) => ws.send(JSON.stringify({type: "data", stream: "stderr", data: data.toString()}))) + eventBus.on("stage", (stage)=> ws.send(JSON.stringify({type: "stage", stage}))) + eventBus.on("exit", (stage, status) => ws.send(JSON.stringify({type: "exit", stage, ...status}))) + + ws.on("message", async (data) => { + + try{ + const msg = JSON.parse(data); + + if(msg.type === "init"){ + if(job === null){ + const job = await get_job(msg); + + await job.prime(); + + ws.send(JSON.stringify({ + type: "runtime", + language: job.runtime.language, + version: job.runtime.version.raw + })) + + await job.execute_interactive(eventBus); + + ws.close(4999, "Job Completed"); + + }else{ + ws.close(4000, "Already Initialized"); + } + + }else if(msg.type === "data"){ + if(job !== null){ + if(msg.stream === "stdin"){ + eventBus.emit("stdin", msg.data) + }else{ + ws.close(4004, "Can only write to stdin") + } + }else{ + ws.close(4003, "Not yet initialized") + } + } + }catch(error){ + ws.send(JSON.stringify({type: "error", message: error.message})) + ws.close(4002, "Notified Error") + // ws.close message is limited to 123 characters, so we notify over WS then close. + } + }) + + ws.on("close", async ()=>{ + if(job !== null){ + await job.cleanup() + } + }) + + setTimeout(()=>{ + //Terminate the socket after 1 second, if not initialized. + //if(job === null) + // ws.close(4001, "Initialization Timeout"); + }, 1000) +}) + router.post('/execute', async (req, res) => { - const { - language, - version, - files, - stdin, - args, - run_timeout, - compile_timeout, - compile_memory_limit, - run_memory_limit, - } = req.body; - if (!language || typeof language !== 'string') { - return res.status(400).send({ - message: 'language is required as a string', - }); + try{ + const job = await get_job(req.body); + + await job.prime(); + + const result = await job.execute(); + + await job.cleanup(); + + return res.status(200).send(result); + }catch(error){ + return res.status(400).json(error); } - - if (!version || typeof version !== 'string') { - return res.status(400).send({ - message: 'version is required as a string', - }); - } - - if (!files || !Array.isArray(files)) { - return res.status(400).send({ - message: 'files is required as an array', - }); - } - - for (const [i, file] of files.entries()) { - if (typeof file.content !== 'string') { - return res.status(400).send({ - message: `files[${i}].content is required as a string`, - }); - } - } - - if (compile_memory_limit) { - if (typeof compile_memory_limit !== 'number') { - return res.status(400).send({ - message: 'if specified, compile_memory_limit must be a number', - }); - } - - if ( - config.compile_memory_limit >= 0 && - (compile_memory_limit > config.compile_memory_limit || - compile_memory_limit < 0) - ) { - return res.status(400).send({ - message: - 'compile_memory_limit cannot exceed the configured limit of ' + - config.compile_memory_limit, - }); - } - } - - if (run_memory_limit) { - if (typeof run_memory_limit !== 'number') { - return res.status(400).send({ - message: 'if specified, run_memory_limit must be a number', - }); - } - - if ( - config.run_memory_limit >= 0 && - (run_memory_limit > config.run_memory_limit || run_memory_limit < 0) - ) { - return res.status(400).send({ - message: - 'run_memory_limit cannot exceed the configured limit of ' + - config.run_memory_limit, - }); - } - } - - const rt = runtime.get_latest_runtime_matching_language_version( - language, - version - ); - - if (rt === undefined) { - return res.status(400).send({ - message: `${language}-${version} runtime is unknown`, - }); - } - - const job = new Job({ - runtime: rt, - alias: language, - files: files, - args: args || [], - stdin: stdin || '', - timeouts: { - run: run_timeout || 3000, - compile: compile_timeout || 10000, - }, - memory_limits: { - run: run_memory_limit || config.run_memory_limit, - compile: compile_memory_limit || config.compile_memory_limit, - }, - }); - - await job.prime(); - - const result = await job.execute(); - - await job.cleanup(); - - return res.status(200).send(result); }); router.get('/runtimes', (req, res) => { diff --git a/api/src/index.js b/api/src/index.js index ef16916..afd4d15 100644 --- a/api/src/index.js +++ b/api/src/index.js @@ -2,6 +2,7 @@ require('nocamel'); const Logger = require('logplease'); const express = require('express'); +const expressWs = require('express-ws'); const globals = require('./globals'); const config = require('./config'); const path = require('path'); @@ -12,6 +13,9 @@ const runtime = require('./runtime'); const logger = Logger.create('index'); const app = express(); +expressWs(app); + + (async () => { logger.info('Setting loglevel to', config.log_level); diff --git a/api/src/job.js b/api/src/job.js index d4b90ea..8001a76 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -69,7 +69,7 @@ class Job { logger.debug('Primed job'); } - async safe_call(file, args, timeout, memory_limit) { + async safe_call(file, args, timeout, memory_limit, eventBus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -102,9 +102,15 @@ class Job { detached: true, //give this process its own process group }); - proc.stdin.write(this.stdin); - proc.stdin.end(); - proc.stdin.destroy(); + if(eventBus === null){ + proc.stdin.write(this.stdin); + proc.stdin.end(); + proc.stdin.destroy(); + }else{ + eventBus.on("stdin", (data) => { + proc.stdin.write(data); + }) + } const kill_timeout = set_timeout( _ => proc.kill('SIGKILL'), @@ -115,6 +121,7 @@ class Job { if (stderr.length > config.output_max_size) { proc.kill('SIGKILL'); } else { + if(eventBus !== null) eventBus.emit("stderr", data); stderr += data; output += data; } @@ -124,6 +131,7 @@ class Job { if (stdout.length > config.output_max_size) { proc.kill('SIGKILL'); } else { + if(eventBus !== null) eventBus.emit("stdout", data); stdout += data; output += data; } @@ -196,6 +204,49 @@ class Job { }; } + async execute_interactive(eventBus){ + if (this.state !== job_states.PRIMED) { + throw new Error( + 'Job must be in primed state, current state: ' + + this.state.toString() + ); + } + + logger.info( + `Interactively executing job uuid=${this.uuid} uid=${this.uid} gid=${ + this.gid + } runtime=${this.runtime.toString()}` + ); + + if(this.runtime.compiled){ + eventBus.emit("stage", "compile") + const {error, code, signal} = await this.safe_call( + path.join(this.runtime.pkgdir, 'compile'), + this.files.map(x => x.name), + this.timeouts.compile, + this.memory_limits.compile, + eventBus + ) + + eventBus.emit("exit", "compile", {error, code, signal}) + } + + logger.debug('Running'); + eventBus.emit("stage", "run") + const {error, code, signal} = await this.safe_call( + path.join(this.runtime.pkgdir, 'run'), + [this.files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run, + eventBus + ); + + eventBus.emit("exit", "run", {error, code, signal}) + + + this.state = job_states.EXECUTED; + } + async cleanup_processes() { let processes = [1];