mirror of
https://github.com/engineer-man/piston.git
synced 2025-08-08 05:48:46 +02:00
Compare commits
No commits in common. "847303b15e627c6582a15f812c3ce131e17ee010" and "b98b208d0d8c6671693d1380b22449ebcff4d351" have entirely different histories.
847303b15e
...
b98b208d0d
2 changed files with 95 additions and 70 deletions
|
@ -172,9 +172,9 @@ router.use((req, res, next) => {
|
|||
|
||||
router.ws('/connect', async (ws, req) => {
|
||||
let job = null;
|
||||
let event_bus = new events.EventEmitter();
|
||||
let eventBus = new events.EventEmitter();
|
||||
|
||||
event_bus.on('stdout', data =>
|
||||
eventBus.on('stdout', data =>
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: 'data',
|
||||
|
@ -183,7 +183,7 @@ router.ws('/connect', async (ws, req) => {
|
|||
})
|
||||
)
|
||||
);
|
||||
event_bus.on('stderr', data =>
|
||||
eventBus.on('stderr', data =>
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: 'data',
|
||||
|
@ -192,10 +192,10 @@ router.ws('/connect', async (ws, req) => {
|
|||
})
|
||||
)
|
||||
);
|
||||
event_bus.on('stage', stage =>
|
||||
eventBus.on('stage', stage =>
|
||||
ws.send(JSON.stringify({ type: 'stage', stage }))
|
||||
);
|
||||
event_bus.on('exit', (stage, status) =>
|
||||
eventBus.on('exit', (stage, status) =>
|
||||
ws.send(JSON.stringify({ type: 'exit', stage, ...status }))
|
||||
);
|
||||
|
||||
|
@ -218,8 +218,7 @@ router.ws('/connect', async (ws, req) => {
|
|||
})
|
||||
);
|
||||
|
||||
await job.execute(event_bus);
|
||||
await job.cleanup();
|
||||
await job.execute_interactive(eventBus);
|
||||
|
||||
ws.close(4999, 'Job Completed');
|
||||
} else {
|
||||
|
@ -229,7 +228,7 @@ router.ws('/connect', async (ws, req) => {
|
|||
case 'data':
|
||||
if (job !== null) {
|
||||
if (msg.stream === 'stdin') {
|
||||
event_bus.emit('stdin', msg.data);
|
||||
eventBus.emit('stdin', msg.data);
|
||||
} else {
|
||||
ws.close(4004, 'Can only write to stdin');
|
||||
}
|
||||
|
@ -240,7 +239,7 @@ router.ws('/connect', async (ws, req) => {
|
|||
case 'signal':
|
||||
if (job !== null) {
|
||||
if (SIGNALS.includes(msg.signal)) {
|
||||
event_bus.emit('signal', msg.signal);
|
||||
eventBus.emit('signal', msg.signal);
|
||||
} else {
|
||||
ws.close(4005, 'Invalid signal');
|
||||
}
|
||||
|
@ -256,6 +255,12 @@ router.ws('/connect', async (ws, req) => {
|
|||
}
|
||||
});
|
||||
|
||||
ws.on('close', async () => {
|
||||
if (job !== null) {
|
||||
await job.cleanup();
|
||||
}
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
//Terminate the socket after 1 second, if not initialized.
|
||||
if (job === null) ws.close(4001, 'Initialization Timeout');
|
||||
|
|
136
api/src/job.js
136
api/src/job.js
|
@ -18,7 +18,14 @@ let uid = 0;
|
|||
let gid = 0;
|
||||
|
||||
let remaining_job_spaces = config.max_concurrent_jobs;
|
||||
let job_queue = [];
|
||||
let jobQueue = [];
|
||||
|
||||
setInterval(() => {
|
||||
// Every 10ms try resolve a new job, if there is an available slot
|
||||
if (jobQueue.length > 0 && remaining_job_spaces > 0) {
|
||||
jobQueue.shift()();
|
||||
}
|
||||
}, 10);
|
||||
|
||||
class Job {
|
||||
#active_timeouts;
|
||||
|
@ -74,9 +81,10 @@ class Job {
|
|||
if (remaining_job_spaces < 1) {
|
||||
this.logger.info(`Awaiting job slot`);
|
||||
await new Promise(resolve => {
|
||||
job_queue.push(resolve);
|
||||
jobQueue.push(resolve);
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info(`Priming job`);
|
||||
remaining_job_spaces--;
|
||||
this.logger.debug('Writing files to job cache');
|
||||
|
@ -118,11 +126,6 @@ class Job {
|
|||
this.#active_timeouts = [];
|
||||
this.logger.debug('Cleared the active timeouts');
|
||||
|
||||
this.cleanup_processes();
|
||||
this.logger.debug(`Finished exit cleanup`);
|
||||
}
|
||||
|
||||
close_cleanup() {
|
||||
for (const proc of this.#active_parent_processes) {
|
||||
proc.stderr.destroy();
|
||||
if (!proc.stdin.destroyed) {
|
||||
|
@ -132,10 +135,13 @@ class Job {
|
|||
proc.stdout.destroy();
|
||||
}
|
||||
this.#active_parent_processes = [];
|
||||
this.logger.debug('Destroyed processes writables');
|
||||
this.logger.debug('Destroyed parent processes writables');
|
||||
|
||||
this.cleanup_processes();
|
||||
this.logger.debug(`Finished exit cleanup`);
|
||||
}
|
||||
|
||||
async safe_call(file, args, timeout, memory_limit, event_bus = null) {
|
||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const nonetwork = config.disable_networking ? ['nosocket'] : [];
|
||||
|
||||
|
@ -147,10 +153,7 @@ class Job {
|
|||
];
|
||||
|
||||
const timeout_call = [
|
||||
'timeout',
|
||||
'-s',
|
||||
'9',
|
||||
Math.ceil(timeout / 1000),
|
||||
'timeout', '-s', '9', Math.ceil(timeout / 1000),
|
||||
];
|
||||
|
||||
if (memory_limit >= 0) {
|
||||
|
@ -181,16 +184,16 @@ class Job {
|
|||
|
||||
this.#active_parent_processes.push(proc);
|
||||
|
||||
if (event_bus === null) {
|
||||
if (eventBus === null) {
|
||||
proc.stdin.write(this.stdin);
|
||||
proc.stdin.end();
|
||||
proc.stdin.destroy();
|
||||
} else {
|
||||
event_bus.on('stdin', data => {
|
||||
eventBus.on('stdin', data => {
|
||||
proc.stdin.write(data);
|
||||
});
|
||||
|
||||
event_bus.on('kill', signal => {
|
||||
eventBus.on('kill', signal => {
|
||||
proc.kill(signal);
|
||||
});
|
||||
}
|
||||
|
@ -205,8 +208,8 @@ class Job {
|
|||
this.#active_timeouts.push(kill_timeout);
|
||||
|
||||
proc.stderr.on('data', async data => {
|
||||
if (event_bus !== null) {
|
||||
event_bus.emit('stderr', data);
|
||||
if (eventBus !== null) {
|
||||
eventBus.emit('stderr', data);
|
||||
} else if (stderr.length > this.runtime.output_max_size) {
|
||||
this.logger.info(`stderr length exceeded`);
|
||||
process.kill(proc.pid, 'SIGKILL');
|
||||
|
@ -217,8 +220,8 @@ class Job {
|
|||
});
|
||||
|
||||
proc.stdout.on('data', async data => {
|
||||
if (event_bus !== null) {
|
||||
event_bus.emit('stdout', data);
|
||||
if (eventBus !== null) {
|
||||
eventBus.emit('stdout', data);
|
||||
} else if (stdout.length > this.runtime.output_max_size) {
|
||||
this.logger.info(`stdout length exceeded`);
|
||||
process.kill(proc.pid, 'SIGKILL');
|
||||
|
@ -228,24 +231,21 @@ class Job {
|
|||
}
|
||||
});
|
||||
|
||||
proc.on('exit', () => this.exit_cleanup());
|
||||
|
||||
proc.on('close', (code, signal) => {
|
||||
this.close_cleanup();
|
||||
proc.on('exit', (code, signal) => {
|
||||
this.exit_cleanup();
|
||||
|
||||
resolve({ stdout, stderr, code, signal, output });
|
||||
});
|
||||
|
||||
proc.on('error', err => {
|
||||
this.exit_cleanup();
|
||||
this.close_cleanup();
|
||||
|
||||
reject({ error: err, stdout, stderr, output });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async execute(event_bus = null) {
|
||||
async execute() {
|
||||
if (this.state !== job_states.PRIMED) {
|
||||
throw new Error(
|
||||
'Job must be in primed state, current state: ' +
|
||||
|
@ -256,54 +256,32 @@ class Job {
|
|||
this.logger.info(`Executing job runtime=${this.runtime.toString()}`);
|
||||
|
||||
const code_files = this.files.filter(file => file.encoding == 'utf8');
|
||||
|
||||
this.logger.debug('Compiling');
|
||||
|
||||
let compile;
|
||||
let compile_errored = false;
|
||||
const { emit_event_bus_result, emit_event_bus_stage } =
|
||||
event_bus === null
|
||||
? {
|
||||
emit_event_bus_result: () => {},
|
||||
emit_event_bus_stage: () => {},
|
||||
}
|
||||
: {
|
||||
emit_event_bus_result: (stage, result, event_bus) => {
|
||||
const { error, code, signal } = result;
|
||||
event_bus.emit('exit', stage, {
|
||||
error,
|
||||
code,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
emit_event_bus_stage: (stage, event_bus) => {
|
||||
event_bus.emit('stage', stage);
|
||||
},
|
||||
};
|
||||
|
||||
if (this.runtime.compiled) {
|
||||
this.logger.debug('Compiling');
|
||||
emit_event_bus_stage('compile', event_bus);
|
||||
compile = await this.safe_call(
|
||||
this.runtime.compile,
|
||||
code_files.map(x => x.name),
|
||||
this.timeouts.compile,
|
||||
this.memory_limits.compile,
|
||||
event_bus
|
||||
this.memory_limits.compile
|
||||
);
|
||||
emit_event_bus_result('compile', compile, event_bus);
|
||||
compile_errored = compile.code !== 0;
|
||||
}
|
||||
|
||||
let run;
|
||||
if (!compile_errored) {
|
||||
this.logger.debug('Running');
|
||||
emit_event_bus_stage('run', event_bus);
|
||||
|
||||
run = await this.safe_call(
|
||||
this.runtime.run,
|
||||
[code_files[0].name, ...this.args],
|
||||
this.timeouts.run,
|
||||
this.memory_limits.run,
|
||||
event_bus
|
||||
this.memory_limits.run
|
||||
);
|
||||
emit_event_bus_result('run', run, event_bus);
|
||||
}
|
||||
|
||||
this.state = job_states.EXECUTED;
|
||||
|
@ -316,6 +294,52 @@ class Job {
|
|||
};
|
||||
}
|
||||
|
||||
async execute_interactive(eventBus) {
|
||||
if (this.state !== job_states.PRIMED) {
|
||||
throw new Error(
|
||||
'Job must be in primed state, current state: ' +
|
||||
this.state.toString()
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Interactively executing job runtime=${this.runtime.toString()}`
|
||||
);
|
||||
|
||||
const code_files = this.files.filter(file => file.encoding == 'utf8');
|
||||
|
||||
let compile_errored = false;
|
||||
if (this.runtime.compiled) {
|
||||
eventBus.emit('stage', 'compile');
|
||||
const { error, code, signal } = await this.safe_call(
|
||||
this.runtime.compile,
|
||||
code_files.map(x => x.name),
|
||||
this.timeouts.compile,
|
||||
this.memory_limits.compile,
|
||||
eventBus
|
||||
);
|
||||
|
||||
eventBus.emit('exit', 'compile', { error, code, signal });
|
||||
compile_errored = code !== 0;
|
||||
}
|
||||
|
||||
if (!compile_errored) {
|
||||
this.logger.debug('Running');
|
||||
eventBus.emit('stage', 'run');
|
||||
const { error, code, signal } = await this.safe_call(
|
||||
this.runtime.run,
|
||||
[code_files[0].name, ...this.args],
|
||||
this.timeouts.run,
|
||||
this.memory_limits.run,
|
||||
eventBus
|
||||
);
|
||||
|
||||
eventBus.emit('exit', 'run', { error, code, signal });
|
||||
}
|
||||
|
||||
this.state = job_states.EXECUTED;
|
||||
}
|
||||
|
||||
cleanup_processes(dont_wait = []) {
|
||||
let processes = [1];
|
||||
const to_wait = [];
|
||||
|
@ -442,13 +466,9 @@ class Job {
|
|||
this.logger.info(`Cleaning up job`);
|
||||
|
||||
this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow
|
||||
this.close_cleanup();
|
||||
await this.cleanup_filesystem();
|
||||
|
||||
remaining_job_spaces++;
|
||||
if (job_queue.length > 0) {
|
||||
job_queue.shift()();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue