Skip to content

Commit 8dcd914

Browse files
mhoegerMark Wolff
andauthored
[worker v2] feature: add before/after InvocationRequest hooks (#342)
* feature: add before/after InvocationRequest hooks (#339) * feature: add before/after InvocationRequest hooks * rm return workerChannel * refactor to register cb pattern * remove double callback on promise resolve * revert changes * Re-added before / after invocation tests refactored for this branch Co-authored-by: Mark Wolff <[email protected]>
1 parent 4407c85 commit 8dcd914

File tree

2 files changed

+293
-264
lines changed

2 files changed

+293
-264
lines changed

src/WorkerChannel.ts

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ import { toTypedData } from './converters';
77
import { augmentTriggerMetadata } from './augmenters';
88
import { systemError, systemWarn } from './utils/Logger';
99
import { InternalException } from './utils/InternalException';
10+
import { Context } from './public/Interfaces';
1011
import LogCategory = rpc.RpcLog.RpcLogCategory;
1112
import LogLevel = rpc.RpcLog.Level;
1213

14+
type InvocationRequestBefore = (context: Context, userFn: Function) => Function;
15+
type InvocationRequestAfter = (context: Context) => void;
16+
1317
/**
1418
* The worker channel should have a way to handle all incoming gRPC messages.
1519
* This includes all incoming StreamingMessage types (exclude *Response types and RpcLog type)
@@ -25,6 +29,8 @@ interface IWorkerChannel {
2529
invocationRequest(requestId: string, msg: rpc.InvocationRequest): void;
2630
invocationCancel(requestId: string, msg: rpc.InvocationCancel): void;
2731
functionEnvironmentReloadRequest(requestId: string, msg: rpc.IFunctionEnvironmentReloadRequest): void;
32+
registerBeforeInvocationRequest(beforeCb: InvocationRequestBefore): void;
33+
registerAfterInvocationRequest(afterCb: InvocationRequestAfter): void;
2834
}
2935

3036
/**
@@ -35,13 +41,17 @@ export class WorkerChannel implements IWorkerChannel {
3541
private _functionLoader: IFunctionLoader;
3642
private _workerId: string;
3743
private _v1WorkerBehavior: boolean;
44+
private _invocationRequestBefore: InvocationRequestBefore[];
45+
private _invocationRequestAfter: InvocationRequestAfter[];
3846

3947
constructor(workerId: string, eventStream: IEventStream, functionLoader: IFunctionLoader) {
4048
this._workerId = workerId;
4149
this._eventStream = eventStream;
4250
this._functionLoader = functionLoader;
4351
// default value
4452
this._v1WorkerBehavior = false;
53+
this._invocationRequestBefore = [];
54+
this._invocationRequestAfter = [];
4555

4656
// call the method with the matching 'event' name on this class, passing the requestId and event message
4757
eventStream.on('data', (msg) => {
@@ -85,6 +95,21 @@ export class WorkerChannel implements IWorkerChannel {
8595
});
8696
}
8797

98+
/**
99+
* Register a patching function to be run before User Function is executed.
100+
* Hook should return a patched version of User Function.
101+
*/
102+
public registerBeforeInvocationRequest(beforeCb: InvocationRequestBefore): void {
103+
this._invocationRequestBefore.push(beforeCb);
104+
}
105+
106+
/**
107+
* Register a function to be run after User Function resolves.
108+
*/
109+
public registerAfterInvocationRequest(afterCb: InvocationRequestAfter): void {
110+
this._invocationRequestAfter.push(afterCb);
111+
}
112+
88113
/**
89114
* Host sends capabilities/init data to worker and requests the worker to initialize itself
90115
* @param requestId gRPC message request id
@@ -252,19 +277,27 @@ export class WorkerChannel implements IWorkerChannel {
252277
requestId: requestId,
253278
invocationResponse: response
254279
});
280+
281+
this.runInvocationRequestAfter(context);
255282
}
256283

257284
let { context, inputs } = CreateContextAndInputs(info, msg, logCallback, resultCallback, this._v1WorkerBehavior);
258285
let userFunction = this._functionLoader.getFunc(<string>msg.functionId);
259286

287+
userFunction = this.runInvocationRequestBefore(context, userFunction);
288+
260289
// catch user errors from the same async context in the event loop and correlate with invocation
261290
// throws from asynchronous work (setTimeout, etc) are caught by 'unhandledException' and cannot be correlated with invocation
262291
try {
263-
let result = userFunction(context, ...inputs);
292+
let result = userFunction(context, ...inputs);
264293

265-
if (result && isFunction(result.then)) {
266-
result.then(result => (<any>context.done)(null, result, true))
267-
.catch(err => (<any>context.done)(err, null, true));
294+
if (result && isFunction(result.then)) {
295+
result.then(result => {
296+
(<any>context.done)(null, result, true)
297+
})
298+
.catch(err => {
299+
(<any>context.done)(err, null, true)
300+
});
268301
}
269302
} catch (err) {
270303
resultCallback(err);
@@ -277,7 +310,7 @@ export class WorkerChannel implements IWorkerChannel {
277310
public startStream(requestId: string, msg: rpc.StartStream): void {
278311
// Not yet implemented
279312
}
280-
313+
281314
/**
282315
* Message is empty by design - Will add more fields in future if needed
283316
*/
@@ -369,4 +402,18 @@ export class WorkerChannel implements IWorkerChannel {
369402

370403
return status;
371404
}
405+
406+
private runInvocationRequestBefore(context: Context, userFunction: Function): Function {
407+
let wrappedFunction = userFunction;
408+
for (let before of this._invocationRequestBefore) {
409+
wrappedFunction = before(context, wrappedFunction);
410+
}
411+
return wrappedFunction;
412+
}
413+
414+
private runInvocationRequestAfter(context: Context) {
415+
for (let after of this._invocationRequestAfter) {
416+
after(context);
417+
}
418+
}
372419
}

0 commit comments

Comments
 (0)