Add master features in V3 API, refactor

This commit is contained in:
Omar Brikaa 2022-02-05 15:30:44 +02:00
parent cdd87ca9a1
commit 7de631383f
3 changed files with 187 additions and 174 deletions

View File

@ -5,7 +5,6 @@ const events = require('events');
const runtime = require('../runtime'); const runtime = require('../runtime');
const { Job } = require('../job'); const { Job } = require('../job');
const logger = require('logplease').create('api/v3');
const SIGNALS = [ const SIGNALS = [
'SIGABRT', 'SIGABRT',
@ -81,49 +80,9 @@ function get_job(body) {
} }
} }
if (compile_memory_limit) { const rt = runtime.find(rt =>
if (typeof compile_memory_limit !== 'number') { [...rt.aliases, rt.language].includes(rt.language)
return reject({ );
message: 'if specified, compile_memory_limit must be a number',
});
}
if (
config.compile_memory_limit >= 0 &&
(compile_memory_limit > config.compile_memory_limit ||
compile_memory_limit < 0)
) {
return reject({
message:
'compile_memory_limit cannot exceed the configured limit of ' +
config.compile_memory_limit,
});
}
}
if (run_memory_limit) {
if (typeof run_memory_limit !== 'number') {
return reject({
message: 'if specified, run_memory_limit must be a number',
});
}
if (
config.run_memory_limit >= 0 &&
(run_memory_limit > config.run_memory_limit || run_memory_limit < 0)
) {
return reject({
message:
'run_memory_limit cannot exceed the configured limit of ' +
config.run_memory_limit,
});
}
}
const rt = runtime.find(rt => [
...rt.aliases,
rt.language
].includes(rt.language))
if (rt === undefined) { if (rt === undefined) {
return reject({ return reject({

View File

@ -3,15 +3,52 @@ const router = express.Router();
const events = require('events'); const events = require('events');
const config = require('../config');
const runtime = require('../runtime'); const runtime = require('../runtime');
const { Job } = require('../job'); const { Job } = require('../job');
const logger = require('logplease').create('api/v3');
const SIGNALS = ["SIGABRT","SIGALRM","SIGBUS","SIGCHLD","SIGCLD","SIGCONT","SIGEMT","SIGFPE","SIGHUP","SIGILL","SIGINFO","SIGINT","SIGIO","SIGIOT","SIGKILL","SIGLOST","SIGPIPE","SIGPOLL","SIGPROF","SIGPWR","SIGQUIT","SIGSEGV","SIGSTKFLT","SIGSTOP","SIGTSTP","SIGSYS","SIGTERM","SIGTRAP","SIGTTIN","SIGTTOU","SIGUNUSED","SIGURG","SIGUSR1","SIGUSR2","SIGVTALRM","SIGXCPU","SIGXFSZ","SIGWINCH"] const SIGNALS = [
'SIGABRT',
'SIGALRM',
'SIGBUS',
'SIGCHLD',
'SIGCLD',
'SIGCONT',
'SIGEMT',
'SIGFPE',
'SIGHUP',
'SIGILL',
'SIGINFO',
'SIGINT',
'SIGIO',
'SIGIOT',
'SIGKILL',
'SIGLOST',
'SIGPIPE',
'SIGPOLL',
'SIGPROF',
'SIGPWR',
'SIGQUIT',
'SIGSEGV',
'SIGSTKFLT',
'SIGSTOP',
'SIGTSTP',
'SIGSYS',
'SIGTERM',
'SIGTRAP',
'SIGTTIN',
'SIGTTOU',
'SIGUNUSED',
'SIGURG',
'SIGUSR1',
'SIGUSR2',
'SIGVTALRM',
'SIGXCPU',
'SIGXFSZ',
'SIGWINCH',
];
// ref: https://man7.org/linux/man-pages/man7/signal.7.html // ref: https://man7.org/linux/man-pages/man7/signal.7.html
function get_job(body){ function get_job(body) {
const { const {
runtime_id, runtime_id,
args, args,
@ -20,93 +57,96 @@ function get_job(body){
compile_memory_limit, compile_memory_limit,
run_memory_limit, run_memory_limit,
run_timeout, run_timeout,
compile_timeout compile_timeout,
} = body; } = body;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (typeof runtime_id !== 'number') { if (typeof runtime_id !== 'number') {
return reject({ return reject({
message: 'runtime_id is required as a number' message: 'runtime_id is required as a number',
}); });
} }
if (!Array.isArray(files)) { if (!files || !Array.isArray(files)) {
return reject({ return reject({
message: 'files is required as an array', message: 'files is required as an array',
}); });
} }
for (const [i, file] of files.entries()) {
if (typeof file.content !== 'string') {
return reject({
message: `files[${i}].content is required as a string`,
});
}
}
if (compile_memory_limit) {
if (typeof compile_memory_limit !== 'number') {
return reject({
message: 'if specified, compile_memory_limit must be a number',
});
}
if (
config.compile_memory_limit >= 0 &&
(compile_memory_limit > config.compile_memory_limit ||
compile_memory_limit < 0)
) {
return reject({
message:
'compile_memory_limit cannot exceed the configured limit of ' +
config.compile_memory_limit,
});
}
}
if (run_memory_limit) {
if (typeof run_memory_limit !== 'number') {
return reject({
message: 'if specified, run_memory_limit must be a number',
});
}
if (
config.run_memory_limit >= 0 &&
(run_memory_limit > config.run_memory_limit || run_memory_limit < 0)
) {
return reject({
message:
'run_memory_limit cannot exceed the configured limit of ' +
config.run_memory_limit,
});
}
}
const rt = runtime[runtime_id]; const rt = runtime[runtime_id];
if (rt === undefined) { if (rt === undefined) {
return reject({ return reject({
message: `Runtime #${runtime_id} is unknown`, message: `Runtime #${runtime_id} is unknown`,
}); });
} }
resolve(new Job({ if (
runtime: rt, rt.language !== 'file' &&
args: args || [], !files.some(file => !file.encoding || file.encoding === 'utf8')
stdin: stdin || "", ) {
files, return reject({
timeouts: { message: 'files must include at least one utf8 encoded file',
run: run_timeout || 3000, });
compile: compile_timeout || 10000, }
},
memory_limits: {
run: run_memory_limit || config.run_memory_limit,
compile: compile_memory_limit || config.compile_memory_limit,
}
}));
})
if (files.some(file => typeof file.content !== 'string')) {
return reject({
message: 'file.content is required as a string',
});
}
for (const constraint of ['memory_limit', 'timeout']) {
for (const type of ['compile', 'run']) {
const constraint_name = `${type}_${constraint}`;
const constraint_value = body[constraint_name];
const configured_limit = rt[`${constraint}s`][type];
if (!constraint_value) {
continue;
}
if (typeof constraint_value !== 'number') {
return reject({
message: `If specified, ${constraint_name} must be a number`,
});
}
if (configured_limit <= 0) {
continue;
}
if (constraint_value > configured_limit) {
return reject({
message: `${constraint_name} cannot exceed the configured limit of ${configured_limit}`,
});
}
if (constraint_value < 0) {
return reject({
message: `${constraint_name} must be non-negative`,
});
}
}
}
const job_compile_timeout = compile_timeout || rt.timeouts.compile;
const job_run_timeout = run_timeout || rt.timeouts.run;
const job_compile_memory_limit =
compile_memory_limit || rt.memory_limits.compile;
const job_run_memory_limit = run_memory_limit || rt.memory_limits.run;
resolve(
new Job({
runtime: rt,
args: args || [],
stdin: stdin || '',
files,
timeouts: {
run: job_run_timeout,
compile: job_compile_timeout,
},
memory_limits: {
run: job_run_memory_limit,
compile: job_compile_memory_limit,
},
})
);
});
} }
router.use((req, res, next) => { router.use((req, res, next) => {
@ -124,89 +164,106 @@ router.use((req, res, next) => {
}); });
router.ws('/connect', async (ws, req) => { router.ws('/connect', async (ws, req) => {
let job = null; let job = null;
let eventBus = new events.EventEmitter(); let eventBus = new events.EventEmitter();
eventBus.on("stdout", (data) => ws.send(JSON.stringify({type: "data", stream: "stdout", data: data.toString()}))) eventBus.on('stdout', data =>
eventBus.on("stderr", (data) => ws.send(JSON.stringify({type: "data", stream: "stderr", data: data.toString()}))) ws.send(
eventBus.on("stage", (stage)=> ws.send(JSON.stringify({type: "stage", stage}))) JSON.stringify({
eventBus.on("exit", (stage, status) => ws.send(JSON.stringify({type: "exit", stage, ...status}))) type: 'data',
stream: 'stdout',
data: data.toString(),
})
)
);
eventBus.on('stderr', data =>
ws.send(
JSON.stringify({
type: 'data',
stream: 'stderr',
data: data.toString(),
})
)
);
eventBus.on('stage', stage =>
ws.send(JSON.stringify({ type: 'stage', stage }))
);
eventBus.on('exit', (stage, status) =>
ws.send(JSON.stringify({ type: 'exit', stage, ...status }))
);
ws.on("message", async (data) => { ws.on('message', async data => {
try {
try{
const msg = JSON.parse(data); const msg = JSON.parse(data);
switch(msg.type){ switch (msg.type) {
case "init": case 'init':
if(job === null){ if (job === null) {
job = await get_job(msg); job = await get_job(msg);
await job.prime(); await job.prime();
ws.send(JSON.stringify({ ws.send(
type: "runtime", JSON.stringify({
language: job.runtime.language, type: 'runtime',
version: job.runtime.version.raw language: job.runtime.language,
})) version: job.runtime.version.raw,
})
);
await job.execute_interactive(eventBus); await job.execute_interactive(eventBus);
ws.close(4999, "Job Completed"); ws.close(4999, 'Job Completed');
} else {
}else{ ws.close(4000, 'Already Initialized');
ws.close(4000, "Already Initialized");
} }
break; break;
case "data": case 'data':
if(job !== null){ if (job !== null) {
if(msg.stream === "stdin"){ if (msg.stream === 'stdin') {
eventBus.emit("stdin", msg.data) eventBus.emit('stdin', msg.data);
}else{ } else {
ws.close(4004, "Can only write to stdin") ws.close(4004, 'Can only write to stdin');
}
} else {
ws.close(4003, 'Not yet initialized');
} }
}else{ break;
ws.close(4003, "Not yet initialized") case 'signal':
} if (job !== null) {
break; if (SIGNALS.includes(msg.signal)) {
case "signal": eventBus.emit('signal', msg.signal);
if(job !== null){ } else {
if(SIGNALS.includes(msg.signal)){ ws.close(4005, 'Invalid signal');
eventBus.emit("signal", msg.signal) }
}else{ } else {
ws.close(4005, "Invalid signal") ws.close(4003, 'Not yet initialized');
} }
}else{ break;
ws.close(4003, "Not yet initialized")
}
break;
} }
} catch (error) {
}catch(error){ ws.send(JSON.stringify({ type: 'error', message: error.message }));
ws.send(JSON.stringify({type: "error", message: error.message})) ws.close(4002, 'Notified Error');
ws.close(4002, "Notified Error")
// ws.close message is limited to 123 characters, so we notify over WS then close. // ws.close message is limited to 123 characters, so we notify over WS then close.
} }
}) });
ws.on("close", async ()=>{ ws.on('close', async () => {
if(job !== null){ if (job !== null) {
await job.cleanup() await job.cleanup();
} }
}) });
setTimeout(()=>{ setTimeout(() => {
//Terminate the socket after 1 second, if not initialized. //Terminate the socket after 1 second, if not initialized.
if(job === null) if (job === null) ws.close(4001, 'Initialization Timeout');
ws.close(4001, "Initialization Timeout"); }, 1000);
}, 1000) });
})
router.post('/execute', async (req, res) => { router.post('/execute', async (req, res) => {
try {
try{
const job = await get_job(req.body); const job = await get_job(req.body);
await job.prime(); await job.prime();
const result = await job.execute(); const result = await job.execute();
@ -214,7 +271,7 @@ router.post('/execute', async (req, res) => {
await job.cleanup(); await job.cleanup();
return res.status(200).send(result); return res.status(200).send(result);
}catch(error){ } catch (error) {
return res.status(400).json(error); return res.status(400).json(error);
} }
}); });
@ -226,7 +283,7 @@ router.get('/runtimes', (req, res) => {
version: rt.version.raw, version: rt.version.raw,
aliases: rt.aliases, aliases: rt.aliases,
runtime: rt.runtime, runtime: rt.runtime,
id: rt.id id: rt.id,
}; };
}); });

View File

@ -1,9 +1,6 @@
const logger = require('logplease').create('runtime'); const logger = require('logplease').create('runtime');
const cp = require('child_process'); const cp = require('child_process');
const config = require('./config'); const config = require('./config');
const globals = require('./globals');
const fss = require('fs');
const path = require('path');
const runtimes = []; const runtimes = [];