Merge pull request #613 from pablo-tx/master
Backport engineer-man#519 parallel requests fix
This commit is contained in:
commit
89e0dd431d
|
@ -174,9 +174,9 @@ router.use((req, res, next) => {
|
||||||
|
|
||||||
router.ws('/connect', async (ws, req) => {
|
router.ws('/connect', async (ws, req) => {
|
||||||
let job = null;
|
let job = null;
|
||||||
let eventBus = new events.EventEmitter();
|
let event_bus = new events.EventEmitter();
|
||||||
|
|
||||||
eventBus.on('stdout', data =>
|
event_bus.on('stdout', data =>
|
||||||
ws.send(
|
ws.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
type: 'data',
|
type: 'data',
|
||||||
|
@ -185,7 +185,7 @@ router.ws('/connect', async (ws, req) => {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
eventBus.on('stderr', data =>
|
event_bus.on('stderr', data =>
|
||||||
ws.send(
|
ws.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
type: 'data',
|
type: 'data',
|
||||||
|
@ -194,10 +194,10 @@ router.ws('/connect', async (ws, req) => {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
eventBus.on('stage', stage =>
|
event_bus.on('stage', stage =>
|
||||||
ws.send(JSON.stringify({ type: 'stage', stage }))
|
ws.send(JSON.stringify({ type: 'stage', stage }))
|
||||||
);
|
);
|
||||||
eventBus.on('exit', (stage, status) =>
|
event_bus.on('exit', (stage, status) =>
|
||||||
ws.send(JSON.stringify({ type: 'exit', stage, ...status }))
|
ws.send(JSON.stringify({ type: 'exit', stage, ...status }))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -220,7 +220,8 @@ router.ws('/connect', async (ws, req) => {
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
await job.execute_interactive(eventBus);
|
await job.execute(event_bus);
|
||||||
|
await job.cleanup();
|
||||||
|
|
||||||
ws.close(4999, 'Job Completed');
|
ws.close(4999, 'Job Completed');
|
||||||
} else {
|
} else {
|
||||||
|
@ -230,7 +231,7 @@ router.ws('/connect', async (ws, req) => {
|
||||||
case 'data':
|
case 'data':
|
||||||
if (job !== null) {
|
if (job !== null) {
|
||||||
if (msg.stream === 'stdin') {
|
if (msg.stream === 'stdin') {
|
||||||
eventBus.emit('stdin', msg.data);
|
event_bus.emit('stdin', msg.data);
|
||||||
} else {
|
} else {
|
||||||
ws.close(4004, 'Can only write to stdin');
|
ws.close(4004, 'Can only write to stdin');
|
||||||
}
|
}
|
||||||
|
@ -241,7 +242,7 @@ router.ws('/connect', async (ws, req) => {
|
||||||
case 'signal':
|
case 'signal':
|
||||||
if (job !== null) {
|
if (job !== null) {
|
||||||
if (SIGNALS.includes(msg.signal)) {
|
if (SIGNALS.includes(msg.signal)) {
|
||||||
eventBus.emit('signal', msg.signal);
|
event_bus.emit('signal', msg.signal);
|
||||||
} else {
|
} else {
|
||||||
ws.close(4005, 'Invalid signal');
|
ws.close(4005, 'Invalid signal');
|
||||||
}
|
}
|
||||||
|
@ -257,12 +258,6 @@ router.ws('/connect', async (ws, req) => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('close', async () => {
|
|
||||||
if (job !== null) {
|
|
||||||
await job.cleanup();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
//Terminate the socket after 1 second, if not initialized.
|
//Terminate the socket after 1 second, if not initialized.
|
||||||
if (job === null) ws.close(4001, 'Initialization Timeout');
|
if (job === null) ws.close(4001, 'Initialization Timeout');
|
||||||
|
@ -275,7 +270,11 @@ router.post('/execute', async (req, res) => {
|
||||||
|
|
||||||
await job.prime();
|
await job.prime();
|
||||||
|
|
||||||
const result = await job.execute();
|
let result = await job.execute();
|
||||||
|
// Backward compatibility when the run stage is not started
|
||||||
|
if (result.run === undefined) {
|
||||||
|
result.run = result.compile;
|
||||||
|
}
|
||||||
|
|
||||||
await job.cleanup();
|
await job.cleanup();
|
||||||
|
|
||||||
|
|
195
api/src/job.js
195
api/src/job.js
|
@ -19,16 +19,12 @@ let uid = 0;
|
||||||
let gid = 0;
|
let gid = 0;
|
||||||
|
|
||||||
let remaining_job_spaces = config.max_concurrent_jobs;
|
let remaining_job_spaces = config.max_concurrent_jobs;
|
||||||
let jobQueue = [];
|
let job_queue = [];
|
||||||
|
|
||||||
setInterval(() => {
|
|
||||||
// Every 10ms try resolve a new job, if there is an available slot
|
|
||||||
if (jobQueue.length > 0 && remaining_job_spaces > 0) {
|
|
||||||
jobQueue.shift()();
|
|
||||||
}
|
|
||||||
}, 10);
|
|
||||||
|
|
||||||
class Job {
|
class Job {
|
||||||
|
#active_timeouts;
|
||||||
|
#active_parent_processes;
|
||||||
|
|
||||||
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
constructor({ runtime, files, args, stdin, timeouts, memory_limits }) {
|
||||||
this.uuid = uuidv4();
|
this.uuid = uuidv4();
|
||||||
|
|
||||||
|
@ -45,6 +41,13 @@ class Job {
|
||||||
|
|
||||||
this.args = args;
|
this.args = args;
|
||||||
this.stdin = stdin;
|
this.stdin = stdin;
|
||||||
|
// Add a trailing newline if it doesn't exist
|
||||||
|
if (this.stdin.slice(-1) !== '\n') {
|
||||||
|
this.stdin += '\n';
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#active_timeouts = [];
|
||||||
|
this.#active_parent_processes = [];
|
||||||
|
|
||||||
this.timeouts = timeouts;
|
this.timeouts = timeouts;
|
||||||
this.memory_limits = memory_limits;
|
this.memory_limits = memory_limits;
|
||||||
|
@ -72,10 +75,9 @@ class Job {
|
||||||
if (remaining_job_spaces < 1) {
|
if (remaining_job_spaces < 1) {
|
||||||
this.logger.info(`Awaiting job slot`);
|
this.logger.info(`Awaiting job slot`);
|
||||||
await new Promise(resolve => {
|
await new Promise(resolve => {
|
||||||
jobQueue.push(resolve);
|
job_queue.push(resolve);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.info(`Priming job`);
|
this.logger.info(`Priming job`);
|
||||||
remaining_job_spaces--;
|
remaining_job_spaces--;
|
||||||
this.logger.debug('Writing files to job cache');
|
this.logger.debug('Writing files to job cache');
|
||||||
|
@ -110,7 +112,31 @@ class Job {
|
||||||
this.logger.debug('Primed job');
|
this.logger.debug('Primed job');
|
||||||
}
|
}
|
||||||
|
|
||||||
async safe_call(file, args, timeout, memory_limit, eventBus = null) {
|
exit_cleanup() {
|
||||||
|
for (const timeout of this.#active_timeouts) {
|
||||||
|
clear_timeout(timeout);
|
||||||
|
}
|
||||||
|
this.#active_timeouts = [];
|
||||||
|
this.logger.debug('Cleared the active timeouts');
|
||||||
|
|
||||||
|
this.cleanup_processes();
|
||||||
|
this.logger.debug(`Finished exit cleanup`);
|
||||||
|
}
|
||||||
|
|
||||||
|
close_cleanup() {
|
||||||
|
for (const proc of this.#active_parent_processes) {
|
||||||
|
proc.stderr.destroy();
|
||||||
|
if (!proc.stdin.destroyed) {
|
||||||
|
proc.stdin.end();
|
||||||
|
proc.stdin.destroy();
|
||||||
|
}
|
||||||
|
proc.stdout.destroy();
|
||||||
|
}
|
||||||
|
this.#active_parent_processes = [];
|
||||||
|
this.logger.debug('Destroyed processes writables');
|
||||||
|
}
|
||||||
|
|
||||||
|
async safe_call(file, args, timeout, memory_limit, event_bus = null) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const nonetwork = config.disable_networking ? ['nosocket'] : [];
|
const nonetwork = config.disable_networking ? ['nosocket'] : [];
|
||||||
|
|
||||||
|
@ -122,7 +148,10 @@ class Job {
|
||||||
];
|
];
|
||||||
|
|
||||||
const timeout_call = [
|
const timeout_call = [
|
||||||
'timeout', '-s', '9', Math.ceil(timeout / 1000),
|
'timeout',
|
||||||
|
'-s',
|
||||||
|
'9',
|
||||||
|
Math.ceil(timeout / 1000),
|
||||||
];
|
];
|
||||||
|
|
||||||
if (memory_limit >= 0) {
|
if (memory_limit >= 0) {
|
||||||
|
@ -155,16 +184,18 @@ class Job {
|
||||||
detached: true, //give this process its own process group
|
detached: true, //give this process its own process group
|
||||||
});
|
});
|
||||||
|
|
||||||
if (eventBus === null) {
|
this.#active_parent_processes.push(proc);
|
||||||
|
|
||||||
|
if (event_bus === null) {
|
||||||
proc.stdin.write(this.stdin);
|
proc.stdin.write(this.stdin);
|
||||||
proc.stdin.end();
|
proc.stdin.end();
|
||||||
proc.stdin.destroy();
|
proc.stdin.destroy();
|
||||||
} else {
|
} else {
|
||||||
eventBus.on('stdin', data => {
|
event_bus.on('stdin', data => {
|
||||||
proc.stdin.write(data);
|
proc.stdin.write(data);
|
||||||
});
|
});
|
||||||
|
|
||||||
eventBus.on('kill', signal => {
|
event_bus.on('kill', signal => {
|
||||||
proc.kill(signal);
|
proc.kill(signal);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -176,10 +207,11 @@ class Job {
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
}, timeout)) ||
|
}, timeout)) ||
|
||||||
null;
|
null;
|
||||||
|
this.#active_timeouts.push(kill_timeout);
|
||||||
|
|
||||||
proc.stderr.on('data', async data => {
|
proc.stderr.on('data', async data => {
|
||||||
if (eventBus !== null) {
|
if (event_bus !== null) {
|
||||||
eventBus.emit('stderr', data);
|
event_bus.emit('stderr', data);
|
||||||
} else if (stderr.length > this.runtime.output_max_size) {
|
} else if (stderr.length > this.runtime.output_max_size) {
|
||||||
this.logger.info(`stderr length exceeded`);
|
this.logger.info(`stderr length exceeded`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
|
@ -190,8 +222,8 @@ class Job {
|
||||||
});
|
});
|
||||||
|
|
||||||
proc.stdout.on('data', async data => {
|
proc.stdout.on('data', async data => {
|
||||||
if (eventBus !== null) {
|
if (event_bus !== null) {
|
||||||
eventBus.emit('stdout', data);
|
event_bus.emit('stdout', data);
|
||||||
} else if (stdout.length > this.runtime.output_max_size) {
|
} else if (stdout.length > this.runtime.output_max_size) {
|
||||||
this.logger.info(`stdout length exceeded`);
|
this.logger.info(`stdout length exceeded`);
|
||||||
process.kill(proc.pid, 'SIGKILL');
|
process.kill(proc.pid, 'SIGKILL');
|
||||||
|
@ -201,31 +233,24 @@ class Job {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const exit_cleanup = () => {
|
proc.on('exit', () => this.exit_cleanup());
|
||||||
clear_timeout(kill_timeout);
|
|
||||||
|
|
||||||
proc.stderr.destroy();
|
proc.on('close', (code, signal) => {
|
||||||
proc.stdout.destroy();
|
this.close_cleanup();
|
||||||
|
|
||||||
this.cleanup_processes();
|
|
||||||
this.logger.debug(`Finished exit cleanup`);
|
|
||||||
};
|
|
||||||
|
|
||||||
proc.on('exit', (code, signal) => {
|
|
||||||
exit_cleanup();
|
|
||||||
|
|
||||||
resolve({ stdout, stderr, code, signal, output });
|
resolve({ stdout, stderr, code, signal, output });
|
||||||
});
|
});
|
||||||
|
|
||||||
proc.on('error', err => {
|
proc.on('error', err => {
|
||||||
exit_cleanup();
|
this.exit_cleanup();
|
||||||
|
this.close_cleanup();
|
||||||
|
|
||||||
reject({ error: err, stdout, stderr, output });
|
reject({ error: err, stdout, stderr, output });
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute() {
|
async execute(event_bus = null) {
|
||||||
if (this.state !== job_states.PRIMED) {
|
if (this.state !== job_states.PRIMED) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
'Job must be in primed state, current state: ' +
|
'Job must be in primed state, current state: ' +
|
||||||
|
@ -242,24 +267,54 @@ class Job {
|
||||||
this.logger.debug('Compiling');
|
this.logger.debug('Compiling');
|
||||||
|
|
||||||
let compile;
|
let compile;
|
||||||
|
let compile_errored = false;
|
||||||
|
const { emit_event_bus_result, emit_event_bus_stage } =
|
||||||
|
event_bus === null
|
||||||
|
? {
|
||||||
|
emit_event_bus_result: () => {},
|
||||||
|
emit_event_bus_stage: () => {},
|
||||||
|
}
|
||||||
|
: {
|
||||||
|
emit_event_bus_result: (stage, result, event_bus) => {
|
||||||
|
const { error, code, signal } = result;
|
||||||
|
event_bus.emit('exit', stage, {
|
||||||
|
error,
|
||||||
|
code,
|
||||||
|
signal,
|
||||||
|
});
|
||||||
|
},
|
||||||
|
emit_event_bus_stage: (stage, event_bus) => {
|
||||||
|
event_bus.emit('stage', stage);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
if (this.runtime.compiled) {
|
if (this.runtime.compiled) {
|
||||||
|
this.logger.debug('Compiling');
|
||||||
|
emit_event_bus_stage('compile', event_bus);
|
||||||
compile = await this.safe_call(
|
compile = await this.safe_call(
|
||||||
path.join(this.runtime.pkgdir, 'compile'),
|
path.join(this.runtime.pkgdir, 'compile'),
|
||||||
code_files.map(x => x.name),
|
code_files.map(x => x.name),
|
||||||
this.timeouts.compile,
|
this.timeouts.compile,
|
||||||
this.memory_limits.compile
|
this.memory_limits.compile,
|
||||||
|
event_bus
|
||||||
);
|
);
|
||||||
|
emit_event_bus_result('compile', compile, event_bus);
|
||||||
|
compile_errored = compile.code !== 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug('Running');
|
let run;
|
||||||
|
if (!compile_errored) {
|
||||||
const run = await this.safe_call(
|
this.logger.debug('Running');
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
emit_event_bus_stage('run', event_bus);
|
||||||
[code_files[0].name, ...this.args],
|
run = await this.safe_call(
|
||||||
this.timeouts.run,
|
path.join(this.runtime.pkgdir, 'run'),
|
||||||
this.memory_limits.run
|
[code_files[0].name, ...this.args],
|
||||||
);
|
this.timeouts.run,
|
||||||
|
this.memory_limits.run,
|
||||||
|
event_bus
|
||||||
|
);
|
||||||
|
emit_event_bus_result('run', run, event_bus);
|
||||||
|
}
|
||||||
|
|
||||||
this.state = job_states.EXECUTED;
|
this.state = job_states.EXECUTED;
|
||||||
|
|
||||||
|
@ -271,50 +326,6 @@ class Job {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute_interactive(eventBus) {
|
|
||||||
if (this.state !== job_states.PRIMED) {
|
|
||||||
throw new Error(
|
|
||||||
'Job must be in primed state, current state: ' +
|
|
||||||
this.state.toString()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info(
|
|
||||||
`Interactively executing job runtime=${this.runtime.toString()}`
|
|
||||||
);
|
|
||||||
|
|
||||||
const code_files =
|
|
||||||
(this.runtime.language === 'file' && this.files) ||
|
|
||||||
this.files.filter(file => file.encoding == 'utf8');
|
|
||||||
|
|
||||||
if (this.runtime.compiled) {
|
|
||||||
eventBus.emit('stage', 'compile');
|
|
||||||
const { error, code, signal } = await this.safe_call(
|
|
||||||
path.join(this.runtime.pkgdir, 'compile'),
|
|
||||||
code_files.map(x => x.name),
|
|
||||||
this.timeouts.compile,
|
|
||||||
this.memory_limits.compile,
|
|
||||||
eventBus
|
|
||||||
);
|
|
||||||
|
|
||||||
eventBus.emit('exit', 'compile', { error, code, signal });
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug('Running');
|
|
||||||
eventBus.emit('stage', 'run');
|
|
||||||
const { error, code, signal } = await this.safe_call(
|
|
||||||
path.join(this.runtime.pkgdir, 'run'),
|
|
||||||
[code_files[0].name, ...this.args],
|
|
||||||
this.timeouts.run,
|
|
||||||
this.memory_limits.run,
|
|
||||||
eventBus
|
|
||||||
);
|
|
||||||
|
|
||||||
eventBus.emit('exit', 'run', { error, code, signal });
|
|
||||||
|
|
||||||
this.state = job_states.EXECUTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
cleanup_processes(dont_wait = []) {
|
cleanup_processes(dont_wait = []) {
|
||||||
let processes = [1];
|
let processes = [1];
|
||||||
const to_wait = [];
|
const to_wait = [];
|
||||||
|
@ -345,11 +356,11 @@ class Job {
|
||||||
const proc_id_int = parse_int(proc_id);
|
const proc_id_int = parse_int(proc_id);
|
||||||
|
|
||||||
// Skip over any processes that aren't ours.
|
// Skip over any processes that aren't ours.
|
||||||
if(ruid != this.uid && euid != this.uid) return -1;
|
if (ruid != this.uid && euid != this.uid) return -1;
|
||||||
|
|
||||||
if (state == 'Z'){
|
if (state == 'Z') {
|
||||||
// Zombie process, just needs to be waited, regardless of the user id
|
// Zombie process, just needs to be waited, regardless of the user id
|
||||||
if(!to_wait.includes(proc_id_int))
|
if (!to_wait.includes(proc_id_int))
|
||||||
to_wait.push(proc_id_int);
|
to_wait.push(proc_id_int);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -386,7 +397,7 @@ class Job {
|
||||||
// Then clear them out of the process tree
|
// Then clear them out of the process tree
|
||||||
try {
|
try {
|
||||||
process.kill(proc, 'SIGKILL');
|
process.kill(proc, 'SIGKILL');
|
||||||
} catch(e) {
|
} catch {
|
||||||
// Could already be dead and just needs to be waited on
|
// Could already be dead and just needs to be waited on
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Got error while SIGKILLing process ${proc}:`,
|
`Got error while SIGKILLing process ${proc}:`,
|
||||||
|
@ -440,10 +451,14 @@ class Job {
|
||||||
async cleanup() {
|
async cleanup() {
|
||||||
this.logger.info(`Cleaning up job`);
|
this.logger.info(`Cleaning up job`);
|
||||||
|
|
||||||
this.cleanup_processes(); // Run process janitor, just incase there are any residual processes somehow
|
this.exit_cleanup(); // Run process janitor, just incase there are any residual processes somehow
|
||||||
|
this.close_cleanup();
|
||||||
await this.cleanup_filesystem();
|
await this.cleanup_filesystem();
|
||||||
|
|
||||||
remaining_job_spaces++;
|
remaining_job_spaces++;
|
||||||
|
if (job_queue.length > 0) {
|
||||||
|
job_queue.shift()();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue