Skip to content

Commit 9bf73ab

Browse files
authored
Fix two stream close issues (b/73167987, b/73382103). (firebase#810)
* Fix b/73167987: Upon receiving a permanent write error when we had additional pendingWrites to send, we were restarting the stream with a new delegate and then immediately setting the delegate to nil, causing the stream to ignore all GRPC events for the stream. * Fix b/73382103: We were attempting to gracefully teardown the write stream (i.e. send an empty WriteRequest) even when the stream was already failed due to an error. This caused no harm other than log pollution, but I fixed it. * Use -[GRPCCall setResponseDispatchQueue] to dispatch GRPC callbacks directly onto the Firestore worker queue. This saves a double-dispatch and simplifies our logic. * Add stricter assertions regarding stream state now that dispatch queue / callback filter race conditions are eliminated.
1 parent 7a4a2ea commit 9bf73ab

File tree

2 files changed

+60
-53
lines changed

2 files changed

+60
-53
lines changed

Firestore/Example/Tests/Integration/FSTStreamTests.mm

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#import <XCTest/XCTest.h>
1818

19+
#import <GRPCClient/GRPCCall.h>
20+
1921
#import <FirebaseFirestore/FIRFirestoreSettings.h>
2022

2123
#import "Firestore/Example/Tests/Util/FSTHelpers.h"
@@ -35,7 +37,7 @@
3537

3638
/** Exposes otherwise private methods for testing. */
3739
@interface FSTStream (Testing)
38-
- (void)writesFinishedWithError:(NSError *_Nullable)error;
40+
@property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
3941
@end
4042

4143
/**
@@ -202,7 +204,9 @@ - (void)testWatchStreamStopBeforeHandshake {
202204
}];
203205

204206
// Simulate a final callback from GRPC
205-
[watchStream writesFinishedWithError:nil];
207+
[_workerDispatchQueue dispatchAsync:^{
208+
[watchStream.callbackFilter writesFinishedWithError:nil];
209+
}];
206210

207211
[self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
208212
}
@@ -224,7 +228,9 @@ - (void)testWriteStreamStopBeforeHandshake {
224228
}];
225229

226230
// Simulate a final callback from GRPC
227-
[writeStream writesFinishedWithError:nil];
231+
[_workerDispatchQueue dispatchAsync:^{
232+
[writeStream.callbackFilter writesFinishedWithError:nil];
233+
}];
228234

229235
[self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
230236
}

Firestore/Source/Remote/FSTStream.mm

Lines changed: 51 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ @interface FSTStream () <GRXWriteable>
152152

153153
#pragma mark - FSTCallbackFilter
154154

155-
/** Filter class that allows disabling of GRPC callbacks. */
155+
/**
156+
* Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
157+
* FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
158+
* and be able to completely prevent any subsequent events from gRPC from calling back into the
159+
* FSTSTream.
160+
*/
156161
@interface FSTCallbackFilter : NSObject <GRXWriteable>
157162

158163
- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
@@ -269,12 +274,12 @@ - (void)startWithDelegate:(id)delegate {
269274

270275
/** Add an access token to our RPC, after obtaining one from the credentials provider. */
271276
- (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
277+
[self.workerDispatchQueue verifyIsCurrentQueue];
278+
272279
if (self.state == FSTStreamStateStopped) {
273280
// Streams can be stopped while waiting for authorization.
274281
return;
275282
}
276-
277-
[self.workerDispatchQueue verifyIsCurrentQueue];
278283
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
279284
(long)self.state);
280285

@@ -288,6 +293,8 @@ - (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
288293

289294
self.requestsWriter = [[FSTBufferedWriter alloc] init];
290295
_rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
296+
[_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
297+
291298
[FSTDatastore prepareHeadersForRPC:_rpc
292299
databaseID:&self.databaseInfo->database_id()
293300
token:(token.is_valid() ? token.token() : absl::string_view())];
@@ -369,7 +376,10 @@ - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *
369376
[self.backoff resetToMax];
370377
}
371378

372-
[self tearDown];
379+
if (finalState != FSTStreamStateError) {
380+
FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
381+
[self tearDown];
382+
}
373383

374384
if (self.requestsWriter) {
375385
// Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
@@ -400,8 +410,9 @@ - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *
400410
[self notifyStreamInterruptedWithError:error];
401411
}
402412

403-
// Clear the delegates to avoid any possible bleed through of events from GRPC.
404-
_delegate = nil;
413+
// PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
414+
// delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
415+
// callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
405416
}
406417

407418
- (void)stop {
@@ -530,11 +541,7 @@ - (void)handleStreamMessage:(id)value {
530541
*/
531542
- (void)handleStreamClose:(nullable NSError *)error {
532543
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
533-
534-
if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
535-
FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
536-
return;
537-
}
544+
FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
538545

539546
// In theory the stream could close cleanly, however, in our current model we never expect this
540547
// to happen because if we stop a stream ourselves, this callback will never be called. To
@@ -547,56 +554,50 @@ - (void)handleStreamClose:(nullable NSError *)error {
547554
// The GRXWriteable implementation defines the receive side of the RPC stream.
548555

549556
/**
550-
* Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
551-
* redispatch back onto our own worker queue.
557+
* Called by GRPC when it publishes a value.
558+
*
559+
* GRPC must be configured to use our worker queue by calling
560+
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
561+
* the RPC.
552562
*/
553-
- (void)writeValue:(id)value __used {
554-
// TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
555-
// Once released we can set the responseDispatchQueue property on the GRPCCall and then this
556-
// method can call handleStreamMessage directly.
557-
FSTWeakify(self);
558-
[self.workerDispatchQueue dispatchAsync:^{
559-
FSTStrongify(self);
560-
if (![self isStarted]) {
561-
FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
562-
return;
563-
}
564-
565-
if (!self.messageReceived) {
566-
self.messageReceived = YES;
567-
if ([FIRFirestore isLoggingEnabled]) {
568-
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
569-
(__bridge void *)self,
570-
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
571-
}
572-
}
573-
NSError *error;
574-
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
575-
if (proto) {
576-
[self handleStreamMessage:proto];
577-
} else {
578-
[_rpc finishWithError:error];
563+
- (void)writeValue:(id)value {
564+
[self.workerDispatchQueue verifyIsCurrentQueue];
565+
FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
566+
567+
if (!self.messageReceived) {
568+
self.messageReceived = YES;
569+
if ([FIRFirestore isLoggingEnabled]) {
570+
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
571+
(__bridge void *)self,
572+
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
579573
}
580-
}];
574+
}
575+
NSError *error;
576+
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
577+
if (proto) {
578+
[self handleStreamMessage:proto];
579+
} else {
580+
[_rpc finishWithError:error];
581+
}
581582
}
582583

583584
/**
584585
* Called by GRPC when it closed the stream with an error representing the final state of the
585586
* stream.
586587
*
587-
* Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
588-
* directly inform stream-specific logic, or call stop to tear down the stream.
588+
* GRPC must be configured to use our worker queue by calling
589+
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
590+
* the RPC.
591+
*
592+
* Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
593+
* stop to tear down the stream.
589594
*/
590595
- (void)writesFinishedWithError:(nullable NSError *)error __used {
591596
error = [FSTDatastore firestoreErrorForError:error];
592-
FSTWeakify(self);
593-
[self.workerDispatchQueue dispatchAsync:^{
594-
FSTStrongify(self);
595-
if (!self || self.state == FSTStreamStateStopped) {
596-
return;
597-
}
598-
[self handleStreamClose:error];
599-
}];
597+
[self.workerDispatchQueue verifyIsCurrentQueue];
598+
FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
599+
600+
[self handleStreamClose:error];
600601
}
601602

602603
@end

0 commit comments

Comments
 (0)