diff --git a/api/package-lock.json b/api/package-lock.json index c46ae87..83df240 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -12,6 +12,7 @@ "body-parser": "^1.19.0", "chownr": "^2.0.0", "express": "^4.17.1", + "express-ws": "^5.0.2", "is-docker": "^2.1.1", "logplease": "^1.2.15", "nocamel": "HexF/nocamel#patch-1", @@ -196,6 +197,20 @@ "node": ">= 0.10.0" } }, + "node_modules/express-ws": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz", + "integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==", + "dependencies": { + "ws": "^7.4.6" + }, + "engines": { + "node": ">=4.5.0" + }, + "peerDependencies": { + "express": "^4.0.0 || ^5.0.0-alpha.1" + } + }, "node_modules/finalhandler": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz", @@ -582,6 +597,26 @@ "node_modules/waitpid": { "resolved": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa" }, + "node_modules/ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/yallist": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", @@ -728,6 +763,14 @@ "vary": "~1.1.2" } }, + "express-ws": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz", + "integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==", + "requires": { + "ws": "^7.4.6" + } + }, "finalhandler": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz", @@ -1010,6 +1053,12 @@ "version": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa", "from": "waitpid@git+https://github.com/HexF/node-waitpid.git" }, + "ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "requires": {} + }, "yallist": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", diff --git a/api/package.json b/api/package.json index ab34063..e8e5b5d 100644 --- a/api/package.json +++ b/api/package.json @@ -1,12 +1,13 @@ { "name": "piston-api", - "version": "3.0.0", + "version": "3.1.0", "description": "API for piston - a high performance code execution engine", "main": "src/index.js", "dependencies": { "body-parser": "^1.19.0", "chownr": "^2.0.0", "express": "^4.17.1", + "express-ws": "^5.0.2", "is-docker": "^2.1.1", "logplease": "^1.2.15", "nocamel": "HexF/nocamel#patch-1", diff --git a/api/src/api/v2.js b/api/src/api/v2.js index 948dccf..0c34d70 100644 --- a/api/src/api/v2.js +++ b/api/src/api/v2.js @@ -1,12 +1,126 @@ const express = require('express'); const router = express.Router(); +const events = require('events'); + const config = require('../config'); const runtime = require('../runtime'); const { Job } = require('../job'); const package = require('../package'); const logger = require('logplease').create('api/v2'); +const SIGNALS = ["SIGABRT","SIGALRM","SIGBUS","SIGCHLD","SIGCLD","SIGCONT","SIGEMT","SIGFPE","SIGHUP","SIGILL","SIGINFO","SIGINT","SIGIO","SIGIOT","SIGKILL","SIGLOST","SIGPIPE","SIGPOLL","SIGPROF","SIGPWR","SIGQUIT","SIGSEGV","SIGSTKFLT","SIGSTOP","SIGTSTP","SIGSYS","SIGTERM","SIGTRAP","SIGTTIN","SIGTTOU","SIGUNUSED","SIGURG","SIGUSR1","SIGUSR2","SIGVTALRM","SIGXCPU","SIGXFSZ","SIGWINCH"] +// ref: https://man7.org/linux/man-pages/man7/signal.7.html + +function get_job(body){ + const { + language, + version, + args, + stdin, + files, + compile_memory_limit, + run_memory_limit, + run_timeout, + compile_timeout + } = body; + + return new Promise((resolve, reject) => { + if (!language || typeof language !== 'string') { + return reject({ + message: 'language is required as a string', + }); + } + + if (!version || typeof version !== 'string') { + return reject({ + message: 'version is required as a string', + }); + } + + if (!files || !Array.isArray(files)) { + return reject({ + message: 'files is required as an array', + }); + } + + for (const [i, file] of files.entries()) { + if (typeof file.content !== 'string') { + return reject({ + message: `files[${i}].content is required as a string`, + }); + } + } + + if (compile_memory_limit) { + if (typeof compile_memory_limit !== 'number') { + return reject({ + message: 'if specified, compile_memory_limit must be a number', + }); + } + + if ( + config.compile_memory_limit >= 0 && + (compile_memory_limit > config.compile_memory_limit || + compile_memory_limit < 0) + ) { + return reject({ + message: + 'compile_memory_limit cannot exceed the configured limit of ' + + config.compile_memory_limit, + }); + } + } + + if (run_memory_limit) { + if (typeof run_memory_limit !== 'number') { + return reject({ + message: 'if specified, run_memory_limit must be a number', + }); + } + + if ( + config.run_memory_limit >= 0 && + (run_memory_limit > config.run_memory_limit || run_memory_limit < 0) + ) { + return reject({ + message: + 'run_memory_limit cannot exceed the configured limit of ' + + config.run_memory_limit, + }); + } + } + + const rt = runtime.get_latest_runtime_matching_language_version( + language, + version + ); + + if (rt === undefined) { + return reject({ + message: `${language}-${version} runtime is unknown`, + }); + } + + resolve(new Job({ + runtime: rt, + alias: language, + args: args || [], + stdin: '', + files, + timeouts: { + run: run_timeout || 3000, + compile: compile_timeout || 10000, + }, + memory_limits: { + run: run_memory_limit || config.run_memory_limit, + compile: compile_memory_limit || config.compile_memory_limit, + } + })); + }) + +} + router.use((req, res, next) => { if (['GET', 'HEAD', 'OPTIONS'].includes(req.method)) { return next(); @@ -21,118 +135,101 @@ router.use((req, res, next) => { next(); }); +router.ws('/connect', async (ws, req) => { + + let job = null; + let eventBus = new events.EventEmitter(); + + eventBus.on("stdout", (data) => ws.send(JSON.stringify({type: "data", stream: "stdout", data: data.toString()}))) + eventBus.on("stderr", (data) => ws.send(JSON.stringify({type: "data", stream: "stderr", data: data.toString()}))) + eventBus.on("stage", (stage)=> ws.send(JSON.stringify({type: "stage", stage}))) + eventBus.on("exit", (stage, status) => ws.send(JSON.stringify({type: "exit", stage, ...status}))) + + ws.on("message", async (data) => { + + try{ + const msg = JSON.parse(data); + + switch(msg.type){ + case "init": + if(job === null){ + job = await get_job(msg); + + await job.prime(); + + ws.send(JSON.stringify({ + type: "runtime", + language: job.runtime.language, + version: job.runtime.version.raw + })) + + await job.execute_interactive(eventBus); + + ws.close(4999, "Job Completed"); + + }else{ + ws.close(4000, "Already Initialized"); + } + break; + case "data": + if(job !== null){ + if(msg.stream === "stdin"){ + eventBus.emit("stdin", msg.data) + }else{ + ws.close(4004, "Can only write to stdin") + } + }else{ + ws.close(4003, "Not yet initialized") + } + break; + case "signal": + if(job !== null){ + if(SIGNALS.includes(msg.signal)){ + eventBus.emit("signal", msg.signal) + }else{ + ws.close(4005, "Invalid signal") + } + }else{ + ws.close(4003, "Not yet initialized") + } + break; + } + + }catch(error){ + ws.send(JSON.stringify({type: "error", message: error.message})) + ws.close(4002, "Notified Error") + // ws.close message is limited to 123 characters, so we notify over WS then close. + } + }) + + ws.on("close", async ()=>{ + if(job !== null){ + await job.cleanup() + } + }) + + setTimeout(()=>{ + //Terminate the socket after 1 second, if not initialized. + if(job === null) + ws.close(4001, "Initialization Timeout"); + }, 1000) +}) + router.post('/execute', async (req, res) => { - const { - language, - version, - files, - stdin, - args, - run_timeout, - compile_timeout, - compile_memory_limit, - run_memory_limit, - } = req.body; - if (!language || typeof language !== 'string') { - return res.status(400).send({ - message: 'language is required as a string', - }); + try{ + const job = await get_job(req.body); + + await job.prime(); + + const result = await job.execute(); + + await job.cleanup(); + + return res.status(200).send(result); + }catch(error){ + return res.status(400).json(error); } - - if (!version || typeof version !== 'string') { - return res.status(400).send({ - message: 'version is required as a string', - }); - } - - if (!files || !Array.isArray(files)) { - return res.status(400).send({ - message: 'files is required as an array', - }); - } - - for (const [i, file] of files.entries()) { - if (typeof file.content !== 'string') { - return res.status(400).send({ - message: `files[${i}].content is required as a string`, - }); - } - } - - if (compile_memory_limit) { - if (typeof compile_memory_limit !== 'number') { - return res.status(400).send({ - message: 'if specified, compile_memory_limit must be a number', - }); - } - - if ( - config.compile_memory_limit >= 0 && - (compile_memory_limit > config.compile_memory_limit || - compile_memory_limit < 0) - ) { - return res.status(400).send({ - message: - 'compile_memory_limit cannot exceed the configured limit of ' + - config.compile_memory_limit, - }); - } - } - - if (run_memory_limit) { - if (typeof run_memory_limit !== 'number') { - return res.status(400).send({ - message: 'if specified, run_memory_limit must be a number', - }); - } - - if ( - config.run_memory_limit >= 0 && - (run_memory_limit > config.run_memory_limit || run_memory_limit < 0) - ) { - return res.status(400).send({ - message: - 'run_memory_limit cannot exceed the configured limit of ' + - config.run_memory_limit, - }); - } - } - - const rt = runtime.get_latest_runtime_matching_language_version( - language, - version - ); - - if (rt === undefined) { - return res.status(400).send({ - message: `${language}-${version} runtime is unknown`, - }); - } - - const job = new Job({ - runtime: rt, - alias: language, - files: files, - args: args || [], - stdin: stdin || '', - timeouts: { - run: run_timeout || 3000, - compile: compile_timeout || 10000, - }, - memory_limits: { - run: run_memory_limit || config.run_memory_limit, - compile: compile_memory_limit || config.compile_memory_limit, - }, - }); - - await job.prime(); - - const result = await job.execute(); - - await job.cleanup(); - - return res.status(200).send(result); }); router.get('/runtimes', (req, res) => { diff --git a/api/src/index.js b/api/src/index.js index ef16916..afd4d15 100644 --- a/api/src/index.js +++ b/api/src/index.js @@ -2,6 +2,7 @@ require('nocamel'); const Logger = require('logplease'); const express = require('express'); +const expressWs = require('express-ws'); const globals = require('./globals'); const config = require('./config'); const path = require('path'); @@ -12,6 +13,9 @@ const runtime = require('./runtime'); const logger = Logger.create('index'); const app = express(); +expressWs(app); + + (async () => { logger.info('Setting loglevel to', config.log_level); diff --git a/api/src/job.js b/api/src/job.js index 07a7ee4..683cda6 100644 --- a/api/src/job.js +++ b/api/src/job.js @@ -76,7 +76,7 @@ class Job { logger.debug('Primed job'); } - async safe_call(file, args, timeout, memory_limit) { + async safe_call(file, args, timeout, memory_limit, eventBus = null) { return new Promise((resolve, reject) => { const nonetwork = config.disable_networking ? ['nosocket'] : []; @@ -109,9 +109,21 @@ class Job { detached: true, //give this process its own process group }); - proc.stdin.write(this.stdin); - proc.stdin.end(); - proc.stdin.destroy(); + if(eventBus === null){ + proc.stdin.write(this.stdin); + proc.stdin.end(); + proc.stdin.destroy(); + }else{ + eventBus.on("stdin", (data) => { + proc.stdin.write(data); + }) + + eventBus.on("kill", (signal) => { + proc.kill(signal) + }) + } + + const kill_timeout = set_timeout( _ => proc.kill('SIGKILL'), @@ -119,7 +131,9 @@ class Job { ); proc.stderr.on('data', data => { - if (stderr.length > config.output_max_size) { + if(eventBus !== null) { + eventBus.emit("stderr", data); + } else if (stderr.length > config.output_max_size) { proc.kill('SIGKILL'); } else { stderr += data; @@ -128,7 +142,9 @@ class Job { }); proc.stdout.on('data', data => { - if (stdout.length > config.output_max_size) { + if(eventBus !== null){ + eventBus.emit("stdout", data); + } else if (stdout.length > config.output_max_size) { proc.kill('SIGKILL'); } else { stdout += data; @@ -203,6 +219,49 @@ class Job { }; } + async execute_interactive(eventBus){ + if (this.state !== job_states.PRIMED) { + throw new Error( + 'Job must be in primed state, current state: ' + + this.state.toString() + ); + } + + logger.info( + `Interactively executing job uuid=${this.uuid} uid=${this.uid} gid=${ + this.gid + } runtime=${this.runtime.toString()}` + ); + + if(this.runtime.compiled){ + eventBus.emit("stage", "compile") + const {error, code, signal} = await this.safe_call( + path.join(this.runtime.pkgdir, 'compile'), + this.files.map(x => x.name), + this.timeouts.compile, + this.memory_limits.compile, + eventBus + ) + + eventBus.emit("exit", "compile", {error, code, signal}) + } + + logger.debug('Running'); + eventBus.emit("stage", "run") + const {error, code, signal} = await this.safe_call( + path.join(this.runtime.pkgdir, 'run'), + [this.files[0].name, ...this.args], + this.timeouts.run, + this.memory_limits.run, + eventBus + ); + + eventBus.emit("exit", "run", {error, code, signal}) + + + this.state = job_states.EXECUTED; + } + async cleanup_processes() { let processes = [1]; diff --git a/cli/commands/execute.js b/cli/commands/execute.js index e273548..abb1f63 100644 --- a/cli/commands/execute.js +++ b/cli/commands/execute.js @@ -1,7 +1,10 @@ -//const fetch = require('node-fetch'); const fs = require('fs'); const path = require('path'); const chalk = require('chalk'); +const WebSocket = require('ws'); + +const SIGNALS = ["SIGABRT","SIGALRM","SIGBUS","SIGCHLD","SIGCLD","SIGCONT","SIGEMT","SIGFPE","SIGHUP","SIGILL","SIGINFO","SIGINT","SIGIO","SIGIOT","SIGLOST","SIGPIPE","SIGPOLL","SIGPROF","SIGPWR","SIGQUIT","SIGSEGV","SIGSTKFLT","SIGTSTP","SIGSYS","SIGTERM","SIGTRAP","SIGTTIN","SIGTTOU","SIGUNUSED","SIGURG","SIGUSR1","SIGUSR2","SIGVTALRM","SIGXCPU","SIGXFSZ","SIGWINCH"] + exports.command = ['execute [args..]']; exports.aliases = ['run']; @@ -35,17 +38,115 @@ exports.builder = { alias: ['f'], array: true, desc: 'Additional files to add', + }, + interactive: { + boolean: true, + alias: ['t'], + desc: 'Run interactively using WebSocket transport' + }, + status: { + boolean: true, + alias: ['s'], + desc: 'Output additional status to stderr' } }; -exports.handler = async (argv) => { - const files = [...(argv.files || []),argv.file] - .map(file_path => { - return { - name: path.basename(file_path), - content: fs.readFileSync(file_path).toString() - }; - }); +async function handle_interactive(files, argv){ + const ws = new WebSocket(argv.pistonUrl.replace("http", "ws") + "/api/v2/connect") + + const log_message = (process.stderr.isTTY && argv.status) ? console.error : ()=>{}; + + process.on("exit", ()=>{ + ws.close(); + process.stdin.end(); + process.stdin.destroy(); + process.exit(); + }) + + for(const signal of SIGNALS){ + process.on(signal, ()=>{ + ws.send(JSON.stringify({type: 'signal', signal})) + }) + } + + + + ws.on('open', ()=>{ + const request = { + type: "init", + language: argv.language, + version: argv['language_version'], + files: files, + args: argv.args, + compile_timeout: argv.ct, + run_timeout: argv.rt + } + + ws.send(JSON.stringify(request)) + log_message(chalk.white.bold("Connected")) + + process.stdin.resume(); + + process.stdin.on("data", (data) => { + ws.send(JSON.stringify({ + type: "data", + stream: "stdin", + data: data.toString() + })) + }) + }) + + ws.on("close", (code, reason)=>{ + log_message( + chalk.white.bold("Disconnected: "), + chalk.white.bold("Reason: "), + chalk.yellow(`"${reason}"`), + chalk.white.bold("Code: "), + chalk.yellow(`"${code}"`), + ) + process.stdin.pause() + }) + + ws.on('message', function(data){ + const msg = JSON.parse(data); + + switch(msg.type){ + case "runtime": + log_message(chalk.bold.white("Runtime:"), chalk.yellow(`${msg.language} ${msg.version}`)) + break; + case "stage": + log_message(chalk.bold.white("Stage:"), chalk.yellow(msg.stage)) + break; + case "data": + if(msg.stream == "stdout") process.stdout.write(msg.data) + else if(msg.stream == "stderr") process.stderr.write(msg.data) + else log_message(chalk.bold.red(`(${msg.stream}) `), msg.data) + break; + case "exit": + if(msg.signal === null) + log_message( + chalk.white.bold("Stage"), + chalk.yellow(msg.stage), + chalk.white.bold("exited with code"), + chalk.yellow(msg.code) + ) + else + log_message( + chalk.white.bold("Stage"), + chalk.yellow(msg.stage), + chalk.white.bold("exited with signal"), + chalk.yellow(msg.signal) + ) + break; + default: + log_message(chalk.red.bold("Unknown message:"), msg) + } + }) + +} + +async function run_non_interactively(files, argv) { + const stdin = (argv.stdin && await new Promise((resolve, _) => { let data = ''; @@ -99,3 +200,18 @@ exports.handler = async (argv) => { step('Run', response.run); } + +exports.handler = async (argv) => { + const files = [...(argv.files || []),argv.file] + .map(file_path => { + return { + name: path.basename(file_path), + content: fs.readFileSync(file_path).toString() + }; + }); + + if(argv.interactive) await handle_interactive(files, argv); + else await run_non_interactively(files, argv); +} + + diff --git a/cli/package-lock.json b/cli/package-lock.json index ad65043..f7c2771 100644 --- a/cli/package-lock.json +++ b/cli/package-lock.json @@ -14,6 +14,7 @@ "minimatch": "^3.0.4", "nocamel": "^1.0.2", "semver": "^7.3.5", + "ws": "^7.5.3", "yargs": "^16.2.0" } }, @@ -254,6 +255,26 @@ "node": ">=10" } }, + "node_modules/ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "5.0.5", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.5.tgz", @@ -466,6 +487,12 @@ "strip-ansi": "^6.0.0" } }, + "ws": { + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz", + "integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==", + "requires": {} + }, "y18n": { "version": "5.0.5", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.5.tgz", diff --git a/cli/package.json b/cli/package.json index c244403..ef1103e 100644 --- a/cli/package.json +++ b/cli/package.json @@ -1,6 +1,6 @@ { "name": "piston-cli", - "version": "1.0.0", + "version": "1.1.0", "description": "Piston Execution Engine CLI tools", "main": "index.js", "license": "MIT", @@ -10,6 +10,7 @@ "minimatch": "^3.0.4", "nocamel": "^1.0.2", "semver": "^7.3.5", + "ws": "^7.5.3", "yargs": "^16.2.0" } } diff --git a/piston b/piston index f979f44..28e18db 100755 --- a/piston +++ b/piston @@ -36,12 +36,13 @@ case $1 in echo " clean-pkgs Clean any package build artifacts on disk" echo " clean-repo Remove all packages from local repo" echo " build-pkg Build a package" - + echo " rebuild Build and restart the docker container" + else echo " Switch to developement environment for more info" - echo " > piston switch dev" - + echo " > piston select dev" + fi ;; @@ -54,6 +55,8 @@ case $1 in stop) docker_compose down ;; bash) docker_compose exec api /bin/bash ;; + rebuild) docker_compose build && docker_compose up -d ;; + update) git pull docker_compose pull