Skip to content

Commit 485d5b2

Browse files
fix: isolate Rest Stream from retry logic to avoid backpressure (#1226)
1 parent c02ea67 commit 485d5b2

File tree

4 files changed

+44
-32
lines changed

4 files changed

+44
-32
lines changed

src/streamingCalls/streamDescriptor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import {StreamingApiCaller} from './streamingApiCaller';
2727
export class StreamDescriptor implements Descriptor {
2828
type: StreamType;
2929
streaming: boolean; // needed for browser support
30+
rest?: boolean;
3031

31-
constructor(streamType: StreamType) {
32+
constructor(streamType: StreamType, rest?: boolean) {
3233
this.type = streamType;
3334
this.streaming = true;
35+
this.rest = rest;
3436
}
3537

3638
getApiCaller(settings: CallSettings): APICaller {

src/streamingCalls/streaming.ts

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import {
2626
SimpleCallbackFunction,
2727
} from '../apitypes';
2828
import {RetryRequestOptions} from '../gax';
29-
import {StreamArrayParser} from '../streamArrayParser';
3029

3130
// eslint-disable-next-line @typescript-eslint/no-var-requires
3231
const duplexify: DuplexifyConstructor = require('duplexify');
@@ -91,6 +90,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
9190
private _isCancelCalled: boolean;
9291
stream?: CancellableStream;
9392
private _responseHasSent: boolean;
93+
rest?: boolean;
9494
/**
9595
* StreamProxy is a proxy to gRPC-streaming method.
9696
*
@@ -99,7 +99,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
9999
* @param {StreamType} type - the type of gRPC stream.
100100
* @param {ApiCallback} callback - the callback for further API call.
101101
*/
102-
constructor(type: StreamType, callback: APICallback) {
102+
constructor(type: StreamType, callback: APICallback, rest?: boolean) {
103103
super(undefined, undefined, {
104104
objectMode: true,
105105
readable: type !== StreamType.CLIENT_STREAMING,
@@ -109,6 +109,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
109109
this._callback = callback;
110110
this._isCancelCalled = false;
111111
this._responseHasSent = false;
112+
this.rest = rest;
112113
}
113114

114115
cancel() {
@@ -125,9 +126,6 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
125126
*/
126127
forwardEvents(stream: Stream) {
127128
const eventsToForward = ['metadata', 'response', 'status'];
128-
if (stream instanceof StreamArrayParser) {
129-
eventsToForward.push('data', 'end', 'error');
130-
}
131129
eventsToForward.forEach(event => {
132130
stream.on(event, this.emit.bind(this, event));
133131
});
@@ -175,26 +173,30 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
175173
retryRequestOptions: RetryRequestOptions = {}
176174
) {
177175
if (this.type === StreamType.SERVER_STREAMING) {
178-
const retryStream = retryRequest(null, {
179-
objectMode: true,
180-
request: () => {
181-
if (this._isCancelCalled) {
182-
if (this.stream) {
183-
this.stream.cancel();
176+
const stream = apiCall(argument, this._callback) as CancellableStream;
177+
this.stream = stream;
178+
if (this.rest) {
179+
this.setReadable(stream);
180+
} else {
181+
const retryStream = retryRequest(null, {
182+
objectMode: true,
183+
request: () => {
184+
if (this._isCancelCalled) {
185+
if (this.stream) {
186+
this.stream.cancel();
187+
}
188+
return;
184189
}
185-
return;
186-
}
187-
const stream = apiCall(argument, this._callback) as CancellableStream;
188-
this.stream = stream;
189-
this.forwardEvents(stream);
190-
return stream;
191-
},
192-
retries: retryRequestOptions!.retries,
193-
currentRetryAttempt: retryRequestOptions!.currentRetryAttempt,
194-
noResponseRetries: retryRequestOptions!.noResponseRetries,
195-
shouldRetryFn: retryRequestOptions!.shouldRetryFn,
196-
});
197-
this.setReadable(retryStream);
190+
this.forwardEvents(stream);
191+
return stream;
192+
},
193+
retries: retryRequestOptions!.retries,
194+
currentRetryAttempt: retryRequestOptions!.currentRetryAttempt,
195+
noResponseRetries: retryRequestOptions!.noResponseRetries,
196+
shouldRetryFn: retryRequestOptions!.shouldRetryFn,
197+
});
198+
this.setReadable(retryStream);
199+
}
198200
return;
199201
}
200202

src/streamingCalls/streamingApiCaller.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ export class StreamingApiCaller implements APICaller {
4444
}
4545

4646
init(callback: APICallback): StreamProxy {
47-
return new StreamProxy(this.descriptor.type, callback);
47+
return new StreamProxy(
48+
this.descriptor.type,
49+
callback,
50+
this.descriptor.rest
51+
);
4852
}
4953

5054
wrap(func: GRPCCall): GRPCCall {

test/unit/streaming.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,15 @@ function createApiCallStreaming(
3636
func:
3737
| Promise<GRPCCall>
3838
| sinon.SinonSpy<Array<{}>, internal.Transform | StreamArrayParser>,
39-
type: streaming.StreamType
39+
type: streaming.StreamType,
40+
rest?: boolean
4041
) {
4142
const settings = new gax.CallSettings();
4243
return createApiCall(
4344
//@ts-ignore
4445
Promise.resolve(func),
4546
settings,
46-
new StreamDescriptor(type)
47+
new StreamDescriptor(type, rest)
4748
) as GaxCallStream;
4849
}
4950

@@ -459,7 +460,7 @@ describe('streaming', () => {
459460
});
460461
});
461462

462-
describe('apiCall return StreamArrayParser', () => {
463+
describe('REST streaming apiCall return StreamArrayParser', () => {
463464
const protos_path = path.resolve(__dirname, '..', 'fixtures', 'user.proto');
464465
const root = protobuf.loadSync(protos_path);
465466
const UserService = root.lookupService('UserService');
@@ -476,7 +477,8 @@ describe('apiCall return StreamArrayParser', () => {
476477
});
477478
const apiCall = createApiCallStreaming(
478479
spy,
479-
streaming.StreamType.SERVER_STREAMING
480+
streaming.StreamType.SERVER_STREAMING,
481+
true
480482
);
481483
const s = apiCall({}, undefined);
482484
assert.strictEqual(s.readable, true);
@@ -505,7 +507,8 @@ describe('apiCall return StreamArrayParser', () => {
505507
});
506508
const apiCall = createApiCallStreaming(
507509
spy,
508-
streaming.StreamType.SERVER_STREAMING
510+
streaming.StreamType.SERVER_STREAMING,
511+
true
509512
);
510513
const s = apiCall({}, undefined);
511514
assert.strictEqual(s.readable, true);
@@ -536,7 +539,8 @@ describe('apiCall return StreamArrayParser', () => {
536539
const apiCall = createApiCallStreaming(
537540
//@ts-ignore
538541
spy,
539-
streaming.StreamType.SERVER_STREAMING
542+
streaming.StreamType.SERVER_STREAMING,
543+
true
540544
);
541545
const s = apiCall({}, undefined);
542546
let counter = 0;

0 commit comments

Comments
 (0)