api: Implement Websocket transport for live data
This commit is contained in:
parent
6a368cf66f
commit
de89acb617
|
@ -12,6 +12,7 @@
|
|||
"body-parser": "^1.19.0",
|
||||
"chownr": "^2.0.0",
|
||||
"express": "^4.17.1",
|
||||
"express-ws": "^5.0.2",
|
||||
"is-docker": "^2.1.1",
|
||||
"logplease": "^1.2.15",
|
||||
"nocamel": "HexF/nocamel#patch-1",
|
||||
|
@ -196,6 +197,20 @@
|
|||
"node": ">= 0.10.0"
|
||||
}
|
||||
},
|
||||
"node_modules/express-ws": {
|
||||
"version": "5.0.2",
|
||||
"resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz",
|
||||
"integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==",
|
||||
"dependencies": {
|
||||
"ws": "^7.4.6"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=4.5.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"express": "^4.0.0 || ^5.0.0-alpha.1"
|
||||
}
|
||||
},
|
||||
"node_modules/finalhandler": {
|
||||
"version": "1.1.2",
|
||||
"resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz",
|
||||
|
@ -582,6 +597,26 @@
|
|||
"node_modules/waitpid": {
|
||||
"resolved": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa"
|
||||
},
|
||||
"node_modules/ws": {
|
||||
"version": "7.5.3",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz",
|
||||
"integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==",
|
||||
"engines": {
|
||||
"node": ">=8.3.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": "^5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/yallist": {
|
||||
"version": "4.0.0",
|
||||
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
|
||||
|
@ -728,6 +763,14 @@
|
|||
"vary": "~1.1.2"
|
||||
}
|
||||
},
|
||||
"express-ws": {
|
||||
"version": "5.0.2",
|
||||
"resolved": "https://registry.npmjs.org/express-ws/-/express-ws-5.0.2.tgz",
|
||||
"integrity": "sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==",
|
||||
"requires": {
|
||||
"ws": "^7.4.6"
|
||||
}
|
||||
},
|
||||
"finalhandler": {
|
||||
"version": "1.1.2",
|
||||
"resolved": "https://registry.npmjs.org/finalhandler/-/finalhandler-1.1.2.tgz",
|
||||
|
@ -1010,6 +1053,12 @@
|
|||
"version": "git+ssh://git@github.com/HexF/node-waitpid.git#a08d116a5d993a747624fe72ff890167be8c34aa",
|
||||
"from": "waitpid@git+https://github.com/HexF/node-waitpid.git"
|
||||
},
|
||||
"ws": {
|
||||
"version": "7.5.3",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-7.5.3.tgz",
|
||||
"integrity": "sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==",
|
||||
"requires": {}
|
||||
},
|
||||
"yallist": {
|
||||
"version": "4.0.0",
|
||||
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
"body-parser": "^1.19.0",
|
||||
"chownr": "^2.0.0",
|
||||
"express": "^4.17.1",
|
||||
"express-ws": "^5.0.2",
|
||||
"is-docker": "^2.1.1",
|
||||
"logplease": "^1.2.15",
|
||||
"nocamel": "HexF/nocamel#patch-1",
|
||||
|
|
|
@ -1,12 +1,124 @@
|
|||
const express = require('express');
|
||||
const router = express.Router();
|
||||
|
||||
const events = require('events');
|
||||
|
||||
const config = require('../config');
|
||||
const runtime = require('../runtime');
|
||||
const { Job } = require('../job');
|
||||
const package = require('../package');
|
||||
const logger = require('logplease').create('api/v2');
|
||||
|
||||
|
||||
function get_job(body){
|
||||
const {
|
||||
language,
|
||||
version,
|
||||
args,
|
||||
stdin,
|
||||
files,
|
||||
compile_memory_limit,
|
||||
run_memory_limit,
|
||||
run_timeout,
|
||||
compile_timeout
|
||||
} = body;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!language || typeof language !== 'string') {
|
||||
return reject({
|
||||
message: 'language is required as a string',
|
||||
});
|
||||
}
|
||||
|
||||
if (!version || typeof version !== 'string') {
|
||||
return reject({
|
||||
message: 'version is required as a string',
|
||||
});
|
||||
}
|
||||
|
||||
if (!files || !Array.isArray(files)) {
|
||||
return reject({
|
||||
message: 'files is required as an array',
|
||||
});
|
||||
}
|
||||
|
||||
for (const [i, file] of files.entries()) {
|
||||
if (typeof file.content !== 'string') {
|
||||
return reject({
|
||||
message: `files[${i}].content is required as a string`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (compile_memory_limit) {
|
||||
if (typeof compile_memory_limit !== 'number') {
|
||||
return reject({
|
||||
message: 'if specified, compile_memory_limit must be a number',
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
config.compile_memory_limit >= 0 &&
|
||||
(compile_memory_limit > config.compile_memory_limit ||
|
||||
compile_memory_limit < 0)
|
||||
) {
|
||||
return reject({
|
||||
message:
|
||||
'compile_memory_limit cannot exceed the configured limit of ' +
|
||||
config.compile_memory_limit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (run_memory_limit) {
|
||||
if (typeof run_memory_limit !== 'number') {
|
||||
return reject({
|
||||
message: 'if specified, run_memory_limit must be a number',
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
config.run_memory_limit >= 0 &&
|
||||
(run_memory_limit > config.run_memory_limit || run_memory_limit < 0)
|
||||
) {
|
||||
return reject({
|
||||
message:
|
||||
'run_memory_limit cannot exceed the configured limit of ' +
|
||||
config.run_memory_limit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const rt = runtime.get_latest_runtime_matching_language_version(
|
||||
language,
|
||||
version
|
||||
);
|
||||
|
||||
if (rt === undefined) {
|
||||
return reject({
|
||||
message: `${language}-${version} runtime is unknown`,
|
||||
});
|
||||
}
|
||||
|
||||
resolve(new Job({
|
||||
runtime: rt,
|
||||
alias: language,
|
||||
args: args || [],
|
||||
stdin: '',
|
||||
files,
|
||||
timeouts: {
|
||||
run: run_timeout || 3000,
|
||||
compile: compile_timeout || 10000,
|
||||
},
|
||||
memory_limits: {
|
||||
run: run_memory_limit || config.run_memory_limit,
|
||||
compile: compile_memory_limit || config.compile_memory_limit,
|
||||
}
|
||||
}));
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
router.use((req, res, next) => {
|
||||
if (['GET', 'HEAD', 'OPTIONS'].includes(req.method)) {
|
||||
return next();
|
||||
|
@ -21,118 +133,87 @@ router.use((req, res, next) => {
|
|||
next();
|
||||
});
|
||||
|
||||
router.ws('/connect', async (ws, req) => {
|
||||
|
||||
let job = null;
|
||||
let eventBus = new events.EventEmitter();
|
||||
|
||||
eventBus.on("stdout", (data) => ws.send(JSON.stringify({type: "data", stream: "stdout", data: data.toString()})))
|
||||
eventBus.on("stderr", (data) => ws.send(JSON.stringify({type: "data", stream: "stderr", data: data.toString()})))
|
||||
eventBus.on("stage", (stage)=> ws.send(JSON.stringify({type: "stage", stage})))
|
||||
eventBus.on("exit", (stage, status) => ws.send(JSON.stringify({type: "exit", stage, ...status})))
|
||||
|
||||
ws.on("message", async (data) => {
|
||||
|
||||
try{
|
||||
const msg = JSON.parse(data);
|
||||
|
||||
if(msg.type === "init"){
|
||||
if(job === null){
|
||||
const job = await get_job(msg);
|
||||
|
||||
await job.prime();
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: "runtime",
|
||||
language: job.runtime.language,
|
||||
version: job.runtime.version.raw
|
||||
}))
|
||||
|
||||
await job.execute_interactive(eventBus);
|
||||
|
||||
ws.close(4999, "Job Completed");
|
||||
|
||||
}else{
|
||||
ws.close(4000, "Already Initialized");
|
||||
}
|
||||
|
||||
}else if(msg.type === "data"){
|
||||
if(job !== null){
|
||||
if(msg.stream === "stdin"){
|
||||
eventBus.emit("stdin", msg.data)
|
||||
}else{
|
||||
ws.close(4004, "Can only write to stdin")
|
||||
}
|
||||
}else{
|
||||
ws.close(4003, "Not yet initialized")
|
||||
}
|
||||
}
|
||||
}catch(error){
|
||||
ws.send(JSON.stringify({type: "error", message: error.message}))
|
||||
ws.close(4002, "Notified Error")
|
||||
// ws.close message is limited to 123 characters, so we notify over WS then close.
|
||||
}
|
||||
})
|
||||
|
||||
ws.on("close", async ()=>{
|
||||
if(job !== null){
|
||||
await job.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
setTimeout(()=>{
|
||||
//Terminate the socket after 1 second, if not initialized.
|
||||
//if(job === null)
|
||||
// ws.close(4001, "Initialization Timeout");
|
||||
}, 1000)
|
||||
})
|
||||
|
||||
router.post('/execute', async (req, res) => {
|
||||
const {
|
||||
language,
|
||||
version,
|
||||
files,
|
||||
stdin,
|
||||
args,
|
||||
run_timeout,
|
||||
compile_timeout,
|
||||
compile_memory_limit,
|
||||
run_memory_limit,
|
||||
} = req.body;
|
||||
|
||||
if (!language || typeof language !== 'string') {
|
||||
return res.status(400).send({
|
||||
message: 'language is required as a string',
|
||||
});
|
||||
try{
|
||||
const job = await get_job(req.body);
|
||||
|
||||
await job.prime();
|
||||
|
||||
const result = await job.execute();
|
||||
|
||||
await job.cleanup();
|
||||
|
||||
return res.status(200).send(result);
|
||||
}catch(error){
|
||||
return res.status(400).json(error);
|
||||
}
|
||||
|
||||
if (!version || typeof version !== 'string') {
|
||||
return res.status(400).send({
|
||||
message: 'version is required as a string',
|
||||
});
|
||||
}
|
||||
|
||||
if (!files || !Array.isArray(files)) {
|
||||
return res.status(400).send({
|
||||
message: 'files is required as an array',
|
||||
});
|
||||
}
|
||||
|
||||
for (const [i, file] of files.entries()) {
|
||||
if (typeof file.content !== 'string') {
|
||||
return res.status(400).send({
|
||||
message: `files[${i}].content is required as a string`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (compile_memory_limit) {
|
||||
if (typeof compile_memory_limit !== 'number') {
|
||||
return res.status(400).send({
|
||||
message: 'if specified, compile_memory_limit must be a number',
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
config.compile_memory_limit >= 0 &&
|
||||
(compile_memory_limit > config.compile_memory_limit ||
|
||||
compile_memory_limit < 0)
|
||||
) {
|
||||
return res.status(400).send({
|
||||
message:
|
||||
'compile_memory_limit cannot exceed the configured limit of ' +
|
||||
config.compile_memory_limit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (run_memory_limit) {
|
||||
if (typeof run_memory_limit !== 'number') {
|
||||
return res.status(400).send({
|
||||
message: 'if specified, run_memory_limit must be a number',
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
config.run_memory_limit >= 0 &&
|
||||
(run_memory_limit > config.run_memory_limit || run_memory_limit < 0)
|
||||
) {
|
||||
return res.status(400).send({
|
||||
message:
|
||||
'run_memory_limit cannot exceed the configured limit of ' +
|
||||
config.run_memory_limit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const rt = runtime.get_latest_runtime_matching_language_version(
|
||||
language,
|
||||
version
|
||||
);
|
||||
|
||||
if (rt === undefined) {
|
||||
return res.status(400).send({
|
||||
message: `${language}-${version} runtime is unknown`,
|
||||
});
|
||||
}
|
||||
|
||||
const job = new Job({
|
||||
runtime: rt,
|
||||
alias: language,
|
||||
files: files,
|
||||
args: args || [],
|
||||
stdin: stdin || '',
|
||||
timeouts: {
|
||||
run: run_timeout || 3000,
|
||||
compile: compile_timeout || 10000,
|
||||
},
|
||||
memory_limits: {
|
||||
run: run_memory_limit || config.run_memory_limit,
|
||||
compile: compile_memory_limit || config.compile_memory_limit,
|
||||
},
|
||||
});
|
||||
|
||||
await job.prime();
|
||||
|
||||
const result = await job.execute();
|
||||
|
||||
await job.cleanup();
|
||||
|
||||
return res.status(200).send(result);
|
||||
});
|
||||
|
||||
router.get('/runtimes', (req, res) => {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
require('nocamel');
|
||||
const Logger = require('logplease');
|
||||
const express = require('express');
|
||||
const expressWs = require('express-ws');
|
||||
const globals = require('./globals');
|
||||
const config = require('./config');
|
||||
const path = require('path');
|
||||
|
@ -12,6 +13,9 @@ const runtime = require('./runtime');
|
|||
|
||||
const logger = Logger.create('index');
|
||||
const app = express();
|
||||
expressWs(app);
|
||||
|
||||
|
||||
|
||||
(async () => {
|
||||
logger.info('Setting loglevel to', config.log_level);
|
||||
|
|
|
@ -69,7 +69,7 @@ class Job {
|
|||
logger.debug('Primed job');
|
||||
}
|
||||
|
||||
async safe_call(file, args, timeout, memory_limit) {
|
||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const nonetwork = config.disable_networking ? ['nosocket'] : [];
|
||||
|
||||
|
@ -102,9 +102,15 @@ class Job {
|
|||
detached: true, //give this process its own process group
|
||||
});
|
||||
|
||||
proc.stdin.write(this.stdin);
|
||||
proc.stdin.end();
|
||||
proc.stdin.destroy();
|
||||
if(eventBus === null){
|
||||
proc.stdin.write(this.stdin);
|
||||
proc.stdin.end();
|
||||
proc.stdin.destroy();
|
||||
}else{
|
||||
eventBus.on("stdin", (data) => {
|
||||
proc.stdin.write(data);
|
||||
})
|
||||
}
|
||||
|
||||
const kill_timeout = set_timeout(
|
||||
_ => proc.kill('SIGKILL'),
|
||||
|
@ -115,6 +121,7 @@ class Job {
|
|||
if (stderr.length > config.output_max_size) {
|
||||
proc.kill('SIGKILL');
|
||||
} else {
|
||||
if(eventBus !== null) eventBus.emit("stderr", data);
|
||||
stderr += data;
|
||||
output += data;
|
||||
}
|
||||
|
@ -124,6 +131,7 @@ class Job {
|
|||
if (stdout.length > config.output_max_size) {
|
||||
proc.kill('SIGKILL');
|
||||
} else {
|
||||
if(eventBus !== null) eventBus.emit("stdout", data);
|
||||
stdout += data;
|
||||
output += data;
|
||||
}
|
||||
|
@ -196,6 +204,49 @@ class Job {
|
|||
};
|
||||
}
|
||||
|
||||
async execute_interactive(eventBus){
|
||||
if (this.state !== job_states.PRIMED) {
|
||||
throw new Error(
|
||||
'Job must be in primed state, current state: ' +
|
||||
this.state.toString()
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Interactively executing job uuid=${this.uuid} uid=${this.uid} gid=${
|
||||
this.gid
|
||||
} runtime=${this.runtime.toString()}`
|
||||
);
|
||||
|
||||
if(this.runtime.compiled){
|
||||
eventBus.emit("stage", "compile")
|
||||
const {error, code, signal} = await this.safe_call(
|
||||
path.join(this.runtime.pkgdir, 'compile'),
|
||||
this.files.map(x => x.name),
|
||||
this.timeouts.compile,
|
||||
this.memory_limits.compile,
|
||||
eventBus
|
||||
)
|
||||
|
||||
eventBus.emit("exit", "compile", {error, code, signal})
|
||||
}
|
||||
|
||||
logger.debug('Running');
|
||||
eventBus.emit("stage", "run")
|
||||
const {error, code, signal} = await this.safe_call(
|
||||
path.join(this.runtime.pkgdir, 'run'),
|
||||
[this.files[0].name, ...this.args],
|
||||
this.timeouts.run,
|
||||
this.memory_limits.run,
|
||||
eventBus
|
||||
);
|
||||
|
||||
eventBus.emit("exit", "run", {error, code, signal})
|
||||
|
||||
|
||||
this.state = job_states.EXECUTED;
|
||||
}
|
||||
|
||||
async cleanup_processes() {
|
||||
let processes = [1];
|
||||
|
||||
|
|
Loading…
Reference in New Issue