Backport engineer-man#519 parallel requests fix

This commit is contained in:
Pablo Pozo 2023-07-14 13:55:40 +03:00
parent 919076e209
commit 11841b3202
2 changed files with 119 additions and 105 deletions

View File

@ -174,9 +174,9 @@ router.use((req, res, next) => {
router.ws('/connect', async (ws, req) => { router.ws('/connect', async (ws, req) => {
let job = null; let job = null;
let eventBus = new events.EventEmitter(); let event_bus = new events.EventEmitter();
eventBus.on('stdout', data => event_bus.on('stdout', data =>
ws.send( ws.send(
JSON.stringify({ JSON.stringify({
type: 'data', type: 'data',
@ -185,7 +185,7 @@ router.ws('/connect', async (ws, req) => {
}) })
) )
); );
eventBus.on('stderr', data => event_bus.on('stderr', data =>
ws.send( ws.send(
JSON.stringify({ JSON.stringify({
type: 'data', type: 'data',
@ -194,10 +194,10 @@ router.ws('/connect', async (ws, req) => {
}) })
) )
); );
eventBus.on('stage', stage => event_bus.on('stage', stage =>
ws.send(JSON.stringify({ type: 'stage', stage })) ws.send(JSON.stringify({ type: 'stage', stage }))
); );
eventBus.on('exit', (stage, status) => event_bus.on('exit', (stage, status) =>
ws.send(JSON.stringify({ type: 'exit', stage, ...status })) ws.send(JSON.stringify({ type: 'exit', stage, ...status }))
); );
@ -220,7 +220,8 @@ router.ws('/connect', async (ws, req) => {
}) })
); );
await job.execute_interactive(eventBus); await job.execute(event_bus);
await job.cleanup();
ws.close(4999, 'Job Completed'); ws.close(4999, 'Job Completed');
} else { } else {
@ -230,7 +231,7 @@ router.ws('/connect', async (ws, req) => {
case 'data': case 'data':
if (job !== null) { if (job !== null) {
if (msg.stream === 'stdin') { if (msg.stream === 'stdin') {
eventBus.emit('stdin', msg.data); event_bus.emit('stdin', msg.data);
} else { } else {
ws.close(4004, 'Can only write to stdin'); ws.close(4004, 'Can only write to stdin');
} }
@ -241,7 +242,7 @@ router.ws('/connect', async (ws, req) => {
case 'signal': case 'signal':
if (job !== null) { if (job !== null) {
if (SIGNALS.includes(msg.signal)) { if (SIGNALS.includes(msg.signal)) {
eventBus.emit('signal', msg.signal); event_bus.emit('signal', msg.signal);
} else { } else {
ws.close(4005, 'Invalid signal'); ws.close(4005, 'Invalid signal');
} }
@ -257,12 +258,6 @@ router.ws('/connect', async (ws, req) => {
} }
}); });
ws.on('close', async () => {
if (job !== null) {
await job.cleanup();
}
});
setTimeout(() => { setTimeout(() => {
//Terminate the socket after 1 second, if not initialized. //Terminate the socket after 1 second, if not initialized.
if (job === null) ws.close(4001, 'Initialization Timeout'); if (job === null) ws.close(4001, 'Initialization Timeout');
@ -275,7 +270,11 @@ router.post('/execute', async (req, res) => {
await job.prime(); await job.prime();
const result = await job.execute(); let result = await job.execute();
// Backward compatibility when the run stage is not started
if (result.run === undefined) {
result.run = result.compile;
}
await job.cleanup(); await job.cleanup();

View File

@ -19,16 +19,12 @@ let uid = 0;
let gid = 0; let gid = 0;
let remaining_job_spaces = config.max_concurrent_jobs; let remaining_job_spaces = config.max_concurrent_jobs;
let jobQueue = []; let job_queue = [];
setInterval(() => {
// Every 10ms try resolve a new job, if there is an available slot
if (jobQueue.length > 0 && remaining_job_spaces > 0) {
jobQueue.shift()();
}
}, 10);
class Job { class Job {
#active_timeouts;
#active_parent_processes;
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) { constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
this.uuid = uuidv4(); this.uuid = uuidv4();
@ -45,6 +41,13 @@ class Job {
this.args = args; this.args = args;
this.stdin = stdin; this.stdin = stdin;
// Add a trailing newline if it doesn't exist
if (this.stdin.slice(-1) !== '\n') {
this.stdin += '\n';
}
this.#active_timeouts = [];
this.#active_parent_processes = [];
this.timeouts = timeouts; this.timeouts = timeouts;
this.memory_limits = memory_limits; this.memory_limits = memory_limits;
@ -72,10 +75,9 @@ class Job {
if (remaining_job_spaces < 1) { if (remaining_job_spaces < 1) {
this.logger.info(`Awaiting job slot`); this.logger.info(`Awaiting job slot`);
await new Promise(resolve => { await new Promise(resolve => {
jobQueue.push(resolve); job_queue.push(resolve);
}); });
} }
this.logger.info(`Priming job`); this.logger.info(`Priming job`);
remaining_job_spaces--; remaining_job_spaces--;
this.logger.debug('Writing files to job cache'); this.logger.debug('Writing files to job cache');
@ -110,7 +112,31 @@ class Job {
this.logger.debug('Primed job'); this.logger.debug('Primed job');
} }
async safe_call(file, args, timeout, memory_limit, eventBus = null) { exit_cleanup() {
for (const timeout of this.#active_timeouts) {
clear_timeout(timeout);
}
this.#active_timeouts = [];
this.logger.debug('Cleared the active timeouts');
this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`);
}
close_cleanup() {
for (const proc of this.#active_parent_processes) {
proc.stderr.destroy();
if (!proc.stdin.destroyed) {
proc.stdin.end();
proc.stdin.destroy();
}
proc.stdout.destroy();
}
this.#active_parent_processes = [];
this.logger.debug('Destroyed processes writables');
}
async safe_call(file, args, timeout, memory_limit, event_bus = null) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const nonetwork = config.disable_networking ? ['nosocket'] : []; const nonetwork = config.disable_networking ? ['nosocket'] : [];
@ -122,7 +148,10 @@ class Job {
]; ];
const timeout_call = [ const timeout_call = [
'timeout', '-s', '9', Math.ceil(timeout / 1000), 'timeout',
'-s',
'9',
Math.ceil(timeout / 1000),
]; ];
if (memory_limit >= 0) { if (memory_limit >= 0) {
@ -155,16 +184,18 @@ class Job {
detached: true, //give this process its own process group detached: true, //give this process its own process group
}); });
if (eventBus === null) { this.#active_parent_processes.push(proc);
if (event_bus === null) {
proc.stdin.write(this.stdin); proc.stdin.write(this.stdin);
proc.stdin.end(); proc.stdin.end();
proc.stdin.destroy(); proc.stdin.destroy();
} else { } else {
eventBus.on('stdin', data => { event_bus.on('stdin', data => {
proc.stdin.write(data); proc.stdin.write(data);
}); });
eventBus.on('kill', signal => { event_bus.on('kill', signal => {
proc.kill(signal); proc.kill(signal);
}); });
} }
@ -176,10 +207,11 @@ class Job {
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
}, timeout)) || }, timeout)) ||
null; null;
this.#active_timeouts.push(kill_timeout);
proc.stderr.on('data', async data => { proc.stderr.on('data', async data => {
if (eventBus !== null) { if (event_bus !== null) {
eventBus.emit('stderr', data); event_bus.emit('stderr', data);
} else if (stderr.length > this.runtime.output_max_size) { } else if (stderr.length > this.runtime.output_max_size) {
this.logger.info(`stderr length exceeded`); this.logger.info(`stderr length exceeded`);
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
@ -190,8 +222,8 @@ class Job {
}); });
proc.stdout.on('data', async data => { proc.stdout.on('data', async data => {
if (eventBus !== null) { if (event_bus !== null) {
eventBus.emit('stdout', data); event_bus.emit('stdout', data);
} else if (stdout.length > this.runtime.output_max_size) { } else if (stdout.length > this.runtime.output_max_size) {
this.logger.info(`stdout length exceeded`); this.logger.info(`stdout length exceeded`);
process.kill(proc.pid, 'SIGKILL'); process.kill(proc.pid, 'SIGKILL');
@ -201,31 +233,24 @@ class Job {
} }
}); });
const exit_cleanup = () => { proc.on('exit', () => this.exit_cleanup());
clear_timeout(kill_timeout);
proc.stderr.destroy(); proc.on('close', (code, signal) => {
proc.stdout.destroy(); this.close_cleanup();
this.cleanup_processes();
this.logger.debug(`Finished exit cleanup`);
};
proc.on('exit', (code, signal) => {
exit_cleanup();
resolve({ stdout, stderr, code, signal, output }); resolve({ stdout, stderr, code, signal, output });
}); });
proc.on('error', err => { proc.on('error', err => {
exit_cleanup(); this.exit_cleanup();
this.close_cleanup();
reject({ error: err, stdout, stderr, output }); reject({ error: err, stdout, stderr, output });
}); });
}); });
} }
async execute() { async execute(event_bus = null) {
if (this.state !== job_states.PRIMED) { if (this.state !== job_states.PRIMED) {
throw new Error( throw new Error(
'Job must be in primed state, current state: ' + 'Job must be in primed state, current state: ' +
@ -242,24 +267,54 @@ class Job {
this.logger.debug('Compiling'); this.logger.debug('Compiling');
let compile; let compile;
let compile_errored = false;
const { emit_event_bus_result, emit_event_bus_stage } =
event_bus === null
? {
emit_event_bus_result: () => {},
emit_event_bus_stage: () => {},
}
: {
emit_event_bus_result: (stage, result, event_bus) => {
const { error, code, signal } = result;
event_bus.emit('exit', stage, {
error,
code,
signal,
});
},
emit_event_bus_stage: (stage, event_bus) => {
event_bus.emit('stage', stage);
},
};
if (this.runtime.compiled) { if (this.runtime.compiled) {
this.logger.debug('Compiling');
emit_event_bus_stage('compile', event_bus);
compile = await this.safe_call( compile = await this.safe_call(
path.join(this.runtime.pkgdir, 'compile'), path.join(this.runtime.pkgdir, 'compile'),
code_files.map(x => x.name), code_files.map(x => x.name),
this.timeouts.compile, this.timeouts.compile,
this.memory_limits.compile this.memory_limits.compile,
event_bus
); );
emit_event_bus_result('compile', compile, event_bus);
compile_errored = compile.code !== 0;
} }
this.logger.debug('Running'); let run;
if (!compile_errored) {
const run = await this.safe_call( this.logger.debug('Running');
path.join(this.runtime.pkgdir, 'run'), emit_event_bus_stage('run', event_bus);
[code_files[0].name, ...this.args], run = await this.safe_call(
this.timeouts.run, path.join(this.runtime.pkgdir, 'run'),
this.memory_limits.run [code_files[0].name, ...this.args],
); this.timeouts.run,
this.memory_limits.run,
event_bus
);
emit_event_bus_result('run', run, event_bus);
}
this.state = job_states.EXECUTED; this.state = job_states.EXECUTED;
@ -271,50 +326,6 @@ class Job {
}; };
} }
async execute_interactive(eventBus) {
if (this.state !== job_states.PRIMED) {
throw new Error(
'Job must be in primed state, current state: ' +
this.state.toString()
);
}
this.logger.info(
`Interactively executing job runtime=${this.runtime.toString()}`
);
const code_files =
(this.runtime.language === 'file' && this.files) ||
this.files.filter(file => file.encoding == 'utf8');
if (this.runtime.compiled) {
eventBus.emit('stage', 'compile');
const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'compile'),
code_files.map(x => x.name),
this.timeouts.compile,
this.memory_limits.compile,
eventBus
);
eventBus.emit('exit', 'compile', { error, code, signal });
}
this.logger.debug('Running');
eventBus.emit('stage', 'run');
const { error, code, signal } = await this.safe_call(
path.join(this.runtime.pkgdir, 'run'),
[code_files[0].name, ...this.args],
this.timeouts.run,
this.memory_limits.run,
eventBus
);
eventBus.emit('exit', 'run', { error, code, signal });
this.state = job_states.EXECUTED;
}
cleanup_processes(dont_wait = []) { cleanup_processes(dont_wait = []) {
let processes = [1]; let processes = [1];
const to_wait = []; const to_wait = [];
@ -345,11 +356,11 @@ class Job {
const proc_id_int = parse_int(proc_id); const proc_id_int = parse_int(proc_id);
// Skip over any processes that aren't ours. // Skip over any processes that aren't ours.
if(ruid != this.uid && euid != this.uid) return -1; if (ruid != this.uid && euid != this.uid) return -1;
if (state == 'Z'){ if (state == 'Z') {
// Zombie process, just needs to be waited, regardless of the user id // Zombie process, just needs to be waited, regardless of the user id
if(!to_wait.includes(proc_id_int)) if (!to_wait.includes(proc_id_int))
to_wait.push(proc_id_int); to_wait.push(proc_id_int);
return -1; return -1;
@ -386,7 +397,7 @@ class Job {
// Then clear them out of the process tree // Then clear them out of the process tree
try { try {
process.kill(proc, 'SIGKILL'); process.kill(proc, 'SIGKILL');
} catch(e) { } catch {
// Could already be dead and just needs to be waited on // Could already be dead and just needs to be waited on
this.logger.debug( this.logger.debug(
`Got error while SIGKILLing process ${proc}:`, `Got error while SIGKILLing process ${proc}:`,
@ -440,10 +451,14 @@ class Job {
async cleanup() { async cleanup() {
this.logger.info(`Cleaning up job`); this.logger.info(`Cleaning up job`);
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow
this.close_cleanup();
await this.cleanup_filesystem(); await this.cleanup_filesystem();
remaining_job_spaces++; remaining_job_spaces++;
if (job_queue.length > 0) {
job_queue.shift()();
}
} }
} }