diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 3e69daf19e0..3a684e8c543 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -97,6 +97,13 @@ const enum PersistentStreamState { */ Open, + /** + * The stream is healthy and has been connected for more than 10 seconds. We + * therefore assume that the credentials we passed were valid. Both + * isStarted() and isOpen() will return true. + */ + Healthy, + /** * The stream encountered an error. The next start attempt will back off. * While in this state isStarted() will return false. @@ -132,6 +139,9 @@ export interface PersistentStreamListener { /** The time a stream stays open after it is marked idle. */ const IDLE_TIMEOUT_MS = 60 * 1000; +/** The time a stream stays open until we consider it healthy. */ +const HEALTHY_TIMEOUT_MS = 10 * 1000; + /** * A PersistentStream is an abstract base class that represents a streaming RPC * to the Firestore backend. It's built on top of the connections own support @@ -178,6 +188,7 @@ export abstract class PersistentStream< private closeCount = 0; private idleTimer: DelayedOperation | null = null; + private healthCheck: DelayedOperation | null = null; private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -186,6 +197,7 @@ export abstract class PersistentStream< private queue: AsyncQueue, connectionTimerId: TimerId, private idleTimerId: TimerId, + private healthTimerId: TimerId, protected connection: Connection, private credentialsProvider: CredentialsProvider, protected listener: ListenerType @@ -203,8 +215,8 @@ export abstract class PersistentStream< isStarted(): boolean { return ( this.state === PersistentStreamState.Starting || - this.state === PersistentStreamState.Open || - this.state === PersistentStreamState.Backoff + this.state === PersistentStreamState.Backoff || + this.isOpen() ); } @@ -213,7 +225,10 @@ export abstract class PersistentStream< * called) and the stream is ready for outbound requests. */ isOpen(): boolean { - return this.state === PersistentStreamState.Open; + return ( + this.state === PersistentStreamState.Open || + this.state === PersistentStreamState.Healthy + ); } /** @@ -311,6 +326,14 @@ export abstract class PersistentStream< } } + /** Cancels the health check delayed operation. */ + private cancelHealthCheck(): void { + if (this.healthCheck) { + this.healthCheck.cancel(); + this.healthCheck = null; + } + } + /** * Closes the stream and cleans up as necessary: * @@ -336,6 +359,7 @@ export abstract class PersistentStream< // Cancel any outstanding timers (they're guaranteed not to execute). this.cancelIdleCheck(); + this.cancelHealthCheck(); this.backoff.cancel(); // Invalidates any stream-related callbacks (e.g. from auth or the @@ -352,9 +376,17 @@ export abstract class PersistentStream< 'Using maximum backoff delay to prevent overloading the backend.' ); this.backoff.resetToMax(); - } else if (error && error.code === Code.UNAUTHENTICATED) { - // "unauthenticated" error means the token was rejected. Try force refreshing it in case it - // just expired. + } else if ( + error && + error.code === Code.UNAUTHENTICATED && + this.state !== PersistentStreamState.Healthy + ) { + // "unauthenticated" error means the token was rejected. This should rarely + // happen since both Auth and AppCheck ensure a sufficient TTL when we + // request a token. If a user manually resets their system clock this can + // fail, however. In this case, we should get a Code.UNAUTHENTICATED error + // before we received the first message and we need to invalidate the token + // to ensure that we fetch a new token. this.credentialsProvider.invalidateToken(); } @@ -448,6 +480,20 @@ export abstract class PersistentStream< 'Expected stream to be in state Starting, but was ' + this.state ); this.state = PersistentStreamState.Open; + debugAssert( + this.healthCheck === null, + 'Expected healthCheck to be null' + ); + this.healthCheck = this.queue.enqueueAfterDelay( + this.healthTimerId, + HEALTHY_TIMEOUT_MS, + () => { + if (this.isOpen()) { + this.state = PersistentStreamState.Healthy; + } + return Promise.resolve(); + } + ); return this.listener!.onOpen(); }); }); @@ -559,6 +605,7 @@ export class PersistentListenStream extends PersistentStream< queue, TimerId.ListenStreamConnectionBackoff, TimerId.ListenStreamIdle, + TimerId.HealthCheckTimeout, connection, credentials, listener @@ -667,6 +714,7 @@ export class PersistentWriteStream extends PersistentStream< queue, TimerId.WriteStreamConnectionBackoff, TimerId.WriteStreamIdle, + TimerId.HealthCheckTimeout, connection, credentials, listener diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index 24f07e77f9b..23c0159745a 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -38,15 +38,17 @@ export const enum TimerId { All = 'all', /** - * The following 4 timers are used in persistent_stream.ts for the listen and + * The following 5 timers are used in persistent_stream.ts for the listen and * write streams. The "Idle" timer is used to close the stream due to * inactivity. The "ConnectionBackoff" timer is used to restart a stream once - * the appropriate backoff delay has elapsed. + * the appropriate backoff delay has elapsed. The health check is used to mark + * a stream healthy if it has not received an error during its initial setup. */ ListenStreamIdle = 'listen_stream_idle', ListenStreamConnectionBackoff = 'listen_stream_connection_backoff', WriteStreamIdle = 'write_stream_idle', WriteStreamConnectionBackoff = 'write_stream_connection_backoff', + HealthCheckTimeout = 'health_check_timeout', /** * A timer used in online_state_tracker.ts to transition from diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index ae812b63c07..2793872287b 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -274,6 +274,24 @@ describe('Write Stream', () => { }); }); +it('token is not invalidated once the stream is healthy', () => { + const credentials = new MockCredentialsProvider(); + + return withTestWriteStream(async (writeStream, streamListener, queue) => { + await streamListener.awaitCallback('open'); + + await queue.runAllDelayedOperationsUntil(TimerId.HealthCheckTimeout); + + // Simulate callback from GRPC with an unauthenticated error -- this should + // NOT invalidate the token. + await writeStream.handleStreamClose( + new FirestoreError(Code.UNAUTHENTICATED, '') + ); + await streamListener.awaitCallback('close'); + expect(credentials.observedStates).to.deep.equal(['getToken']); + }, credentials); +}); + export async function withTestWriteStream( fn: ( writeStream: PersistentWriteStream,