-
-
Notifications
You must be signed in to change notification settings - Fork 75
fix(dei): refresh JWT on auto-reconnect (PartySocket URL provider) #950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(dei): refresh JWT on auto-reconnect (PartySocket URL provider) #950
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
|
WalkthroughRefactors the Durable Event Iterator plugin to use a single options object, introduces a two-phase interceptor strategy, implements token lifecycle management with initial and refresh tokens, constructs URLs dynamically, and establishes a reconnectable WebSocket flow using token-based PartySocket URLs with added plugin context validation. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Plugin
participant ClientInterceptor as Client Interceptor (Phase 1)
participant Interceptor as Interceptor (Phase 2)
participant Upstream as Upstream next()
participant TokenStore as Token Lifecycle
participant WS as ReconnectableWebSocket
participant PartySocket as PartySocket Server
Client->>Plugin: request()
Plugin->>ClientInterceptor: validate plugin context<br/>mark DEI token header in context
ClientInterceptor-->>Plugin: proceed
Plugin->>Interceptor: execute next()
Interceptor->>Upstream: initial token request
Upstream-->>Interceptor: response (token)
Interceptor->>TokenStore: save initialToken
Note over TokenStore: buildUrl(token)<br/>refetchToken() calls next() with snapshot
Plugin->>WS: create with URL provider<br/>(uses initialToken first)
WS->>PartySocket: connect with tokenized URL
PartySocket-->>WS: events/acks
loop Reconnects / Expiry
WS->>TokenStore: need fresh token
TokenStore->>Upstream: refetchToken() via snapshot
Upstream-->>TokenStore: new token
TokenStore-->>WS: provide new tokenized URL
WS->>PartySocket: reconnect with refreshed token
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Rhayxz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request addresses a critical issue where clients using short-lived JSON Web Tokens (JWTs) would continuously fail to reconnect to WebSocket services due to using expired tokens. The solution implements a robust token refresh mechanism for auto-reconnections by leveraging PartySocket's URL provider, ensuring that a fresh JWT is obtained before each reconnection attempt, thereby maintaining continuous service without user intervention.
Highlights
- Problem Addressed: Resolves an issue where short-lived JWTs caused clients to endlessly attempt reconnections with stale tokens, resulting in repeated 401 Unauthorized errors.
- Dynamic URL Provisioning: The PartySocket (PartyKit's ReconnectableWebSocket) is now provided with an asynchronous URL provider function instead of a static string.
- On-Demand Token Refresh: A snapshot of the original interceptor call is taken, allowing it to be re-invoked to fetch a fresh JWT whenever a reconnection is needed.
- Resilient Connection Establishment: A new WebSocket URL is dynamically constructed with the newly acquired token for each reconnection attempt, ensuring successful authentication.
- Non-Breaking Change: The implementation maintains existing plugin behavior and API, introducing no breaking changes, and enhances security for short-lived or rotating tokens.
- Verified Functionality: Local testing confirmed that after JWT expiry, the system successfully fetches a new token and resumes RPC calls and event streaming automatically.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism to refresh JWTs on WebSocket auto-reconnects for the Durable Event Iterator. The approach of using a URL provider function with PartySocket
is well-suited for this purpose, and the implementation correctly snapshots the original call to refetch tokens. This is a great improvement for handling short-lived tokens and enhances security.
I have a couple of suggestions. One is a potential issue regarding a stale token reference in the created durableIterator
, and another is a minor style improvement for better maintainability. Additionally, it would be beneficial to add tests covering the new token refresh logic on reconnect to prevent future regressions.
if (!ctx) | ||
throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For improved code clarity and to prevent potential bugs, it's a good practice to always use curly braces for if
statements, even for single-line blocks. This ensures that the code's intent is clear and reduces the risk of errors if the block is expanded in the future.
if (!ctx) | |
throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor') | |
if (!ctx) { | |
throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor') | |
} |
const durableIterator = createClientDurableEventIterator(iterator, link, { | ||
token, | ||
token: initialToken, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The durableIterator
is created here with the initialToken
string. This token value is passed by value and will become stale after the first token refresh on WebSocket reconnect. Consequently, any call to getClientDurableEventIteratorToken(durableIterator)
will return the original, stale token.
While the core RPC functionality remains correct because ReconnectableWebSocket
handles token refreshes independently, this behavior could be misleading for consumers of getClientDurableEventIteratorToken
.
The comment on line 115 (// keep latest for visibility
) suggests an intent to keep the token updated. If this is the case, createClientDurableEventIterator
would need to be adjusted to accept a token provider function (e.g., () => initialToken
) instead of a static string. This would allow it to always access the latest token but would require changes in packages/durable-event-iterator/src/client/event-iterator.ts
.
If returning the initial token is the intended and acceptable behavior, consider clarifying the comment on line 115 to something like // keep latest for subsequent reconnects
to better reflect its purpose and avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
packages/durable-event-iterator/src/client/plugin.ts (3)
81-83
: Type-check the token at runtime.If the server mistakenly returns a non-string while setting the DEI header, downstream will misbehave. Guard early.
- let initialToken = output as string + if (typeof output !== 'string') + throw new TypeError('[DurableEventIteratorLinkPlugin] Expected token as string from DEI endpoint') + let initialToken = output
100-104
: Support relative URLs (optional).
new URL(await value(this.url))
fails for relative paths. Provide a base when available.- const buildUrl = async (token: string): Promise<string> => { - const u = new URL(await value(this.url)) + const buildUrl = async (token: string): Promise<string> => { + const raw = await value(this.url) + const base = (globalThis as any)?.location?.href + const u = new URL(String(raw), base) u.searchParams.set(DURABLE_EVENT_ITERATOR_TOKEN_PARAM, token) return u.toString() }
123-130
: Avoid double-injecting ClientRetryPlugin.If user supplies a
ClientRetryPlugin
, adding another can cause duplicate retries. Detect and add only if absent.- const durableLink = new RPCLink<ClientRetryPluginContext>({ - ...this.linkOptions, - websocket: durableWs, - plugins: [ - ...toArray(this.linkOptions.plugins), - new ClientRetryPlugin(), - ], - }) + const userPlugins = toArray(this.linkOptions.plugins) + const hasRetry = userPlugins.some(p => p instanceof ClientRetryPlugin) + const durableLink = new RPCLink<ClientRetryPluginContext>({ + ...this.linkOptions, + websocket: durableWs, + plugins: hasRetry ? userPlugins : [...userPlugins, new ClientRetryPlugin()], + })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
packages/durable-event-iterator/src/client/plugin.ts
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/durable-event-iterator/src/client/plugin.ts (3)
packages/client/src/adapters/standard/plugin.ts (1)
StandardLinkPlugin
(4-7)packages/client/src/adapters/standard/link.ts (1)
StandardLinkOptions
(18-22)packages/durable-event-iterator/src/consts.ts (3)
DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY
(2-2)DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE
(3-3)DURABLE_EVENT_ITERATOR_TOKEN_PARAM
(1-1)
🔇 Additional comments (4)
packages/durable-event-iterator/src/client/plugin.ts (4)
66-75
: Context forwarding pattern LGTM.Passing a dedicated plugin context symbol through
interceptorOptions.next(...)
is correct and isolates state.
106-122
: Dynamic PartySocket URL provider: solid approach.This matches the PR goal: refresh token per reconnect while using the initial token for the first dial.
149-151
: Confirm iterator token semantics.
createClientDurableEventIterator(..., { token: initialToken })
receives the initial token only. Confirm it’s used solely for the first handshake and not for subsequent operations, since refresh is handled by PartySocket URL provider.
35-49
: All instantiations use the new single-opts
signature; no legacy calls detected.
// Mark responses that carry a DEI token | ||
options.clientInterceptors.push(async (clientOptions) => { | ||
const ctx = clientOptions.context[this.CONTEXT_SYMBOL] as DurableEventIteratorLinkPluginContext | undefined | ||
if (!ctx) | ||
throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor') | ||
|
||
const output = await options.next({ | ||
...options, | ||
const res = await clientOptions.next() | ||
ctx.isDurableEventIteratorResponse = res.headers[DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY] === DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE | ||
return res | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Do not throw when plugin context is absent; pass-through instead (and normalize header lookup).
This client interceptor runs for all requests. Throwing when ctx
is missing will break non-DEI calls. Make it a no-op when context isn’t present and perform a case-insensitive header read.
- options.clientInterceptors.push(async (clientOptions) => {
- const ctx = clientOptions.context[this.CONTEXT_SYMBOL] as DurableEventIteratorLinkPluginContext | undefined
- if (!ctx)
- throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor')
-
- const res = await clientOptions.next()
- ctx.isDurableEventIteratorResponse = res.headers[DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY] === DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE
- return res
- })
+ options.clientInterceptors.push(async (clientOptions) => {
+ const ctx = clientOptions.context[this.CONTEXT_SYMBOL] as DurableEventIteratorLinkPluginContext | undefined
+ const res = await clientOptions.next()
+ if (ctx) {
+ const headers = res.headers as Record<string, string> | undefined
+ const headerValue =
+ headers?.[DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY] ??
+ headers?.[String(DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY).toLowerCase()]
+ ctx.isDurableEventIteratorResponse = headerValue === DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE
+ }
+ return res
+ })
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Mark responses that carry a DEI token | |
options.clientInterceptors.push(async (clientOptions) => { | |
const ctx = clientOptions.context[this.CONTEXT_SYMBOL] as DurableEventIteratorLinkPluginContext | undefined | |
if (!ctx) | |
throw new TypeError('[DurableEventIteratorLinkPlugin] Plugin context has been corrupted or modified by another plugin or interceptor') | |
const output = await options.next({ | |
...options, | |
const res = await clientOptions.next() | |
ctx.isDurableEventIteratorResponse = res.headers[DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY] === DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE | |
return res | |
}) | |
// Mark responses that carry a DEI token | |
options.clientInterceptors.push(async (clientOptions) => { | |
const ctx = clientOptions.context[this.CONTEXT_SYMBOL] as DurableEventIteratorLinkPluginContext | undefined | |
const res = await clientOptions.next() | |
if (ctx) { | |
const headers = res.headers as Record<string, string> | undefined | |
const headerValue = | |
headers?.[DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY] ?? | |
headers?.[String(DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_KEY).toLowerCase()] | |
ctx.isDurableEventIteratorResponse = headerValue === DURABLE_EVENT_ITERATOR_PLUGIN_HEADER_VALUE | |
} | |
return res | |
}) |
🤖 Prompt for AI Agents
In packages/durable-event-iterator/src/client/plugin.ts around lines 55 to 64,
the client interceptor currently throws when the plugin context is missing and
checks headers case-sensitively; change it to be a no-op when ctx is undefined
(simply await and return clientOptions.next() without throwing or modifying ctx)
so it doesn’t break non-DEI requests, and when reading the response header
perform a case-insensitive lookup (e.g., normalize header keys or compare
lowercased header names/values) before setting
ctx.isDurableEventIteratorResponse.
// Save a snapshot of this exact call so we can re-fetch fresh tokens later | ||
const upstreamNext = interceptorOptions.next | ||
const snapshot = { | ||
path: interceptorOptions.path, | ||
input: interceptorOptions.input, | ||
context: { [this.CONTEXT_SYMBOL]: pluginContext, ...interceptorOptions.context }, | ||
signal: interceptorOptions.signal, | ||
lastEventId: interceptorOptions.lastEventId, | ||
} | ||
|
||
const durableWs = new ReconnectableWebSocket(url.toString(), undefined, { | ||
WebSocket: this.WebSocket, | ||
}) | ||
const refetchToken = async (): Promise<string> => { | ||
const fresh = await upstreamNext(snapshot) | ||
// Server sets the header + returns the token string again. | ||
return fresh as string | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid reusing the original AbortSignal; also dedupe concurrent refreshes.
Reusing interceptorOptions.signal
can cause immediate aborts on refresh. Build fresh options per call. Also, coalesce parallel refresh attempts.
- const upstreamNext = interceptorOptions.next
- const snapshot = {
- path: interceptorOptions.path,
- input: interceptorOptions.input,
- context: { [this.CONTEXT_SYMBOL]: pluginContext, ...interceptorOptions.context },
- signal: interceptorOptions.signal,
- lastEventId: interceptorOptions.lastEventId,
- }
-
- const refetchToken = async (): Promise<string> => {
- const fresh = await upstreamNext(snapshot)
- // Server sets the header + returns the token string again.
- return fresh as string
- }
+ const upstreamNext = interceptorOptions.next
+ const makeSnapshot = () => ({
+ path: interceptorOptions.path,
+ input: interceptorOptions.input,
+ // Fresh context on each refresh; do not reuse potentially-aborted signals.
+ context: { [this.CONTEXT_SYMBOL]: pluginContext, ...interceptorOptions.context },
+ })
+
+ let inflightToken: Promise<string> | null = null
+ const refetchToken = async (): Promise<string> => {
+ if (!inflightToken) {
+ inflightToken = upstreamNext(makeSnapshot()) as Promise<string>
+ try {
+ const fresh = await inflightToken
+ return fresh
+ } finally {
+ inflightToken = null
+ }
+ }
+ return inflightToken
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Save a snapshot of this exact call so we can re-fetch fresh tokens later | |
const upstreamNext = interceptorOptions.next | |
const snapshot = { | |
path: interceptorOptions.path, | |
input: interceptorOptions.input, | |
context: { [this.CONTEXT_SYMBOL]: pluginContext, ...interceptorOptions.context }, | |
signal: interceptorOptions.signal, | |
lastEventId: interceptorOptions.lastEventId, | |
} | |
const durableWs = new ReconnectableWebSocket(url.toString(), undefined, { | |
WebSocket: this.WebSocket, | |
}) | |
const refetchToken = async (): Promise<string> => { | |
const fresh = await upstreamNext(snapshot) | |
// Server sets the header + returns the token string again. | |
return fresh as string | |
} | |
// Save a snapshot of this exact call so we can re-fetch fresh tokens later | |
const upstreamNext = interceptorOptions.next | |
const makeSnapshot = () => ({ | |
path: interceptorOptions.path, | |
input: interceptorOptions.input, | |
// Fresh context on each refresh; do not reuse potentially-aborted signals. | |
context: { [this.CONTEXT_SYMBOL]: pluginContext, ...interceptorOptions.context }, | |
}) | |
let inflightToken: Promise<string> | null = null | |
const refetchToken = async (): Promise<string> => { | |
if (!inflightToken) { | |
inflightToken = upstreamNext(makeSnapshot()) as Promise<string> | |
try { | |
const fresh = await inflightToken | |
return fresh | |
} finally { | |
inflightToken = null | |
} | |
} | |
return inflightToken | |
} |
🤖 Prompt for AI Agents
In packages/durable-event-iterator/src/client/plugin.ts around lines 84 to 99,
the refetchToken function reuses interceptorOptions.signal and replays the
original options object, which can cause immediate aborts and duplicate
concurrent refreshes; fix by creating fresh options for each refetch call (clone
path, input, context, lastEventId but set signal to a new
AbortController().signal or undefined) and call upstreamNext with that fresh
options object, and implement a simple dedupe/coalesce so parallel refetchToken
invocations share a single in-flight Promise (store the Promise on the plugin
instance or closure, return it if present, and clear it once resolved or
rejected).
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
More templates
@orpc/arktype
@orpc/client
@orpc/contract
@orpc/experimental-durable-event-iterator
@orpc/hey-api
@orpc/interop
@orpc/json-schema
@orpc/nest
@orpc/openapi
@orpc/openapi-client
@orpc/otel
@orpc/react
@orpc/react-query
@orpc/experimental-react-swr
@orpc/server
@orpc/shared
@orpc/solid-query
@orpc/standard-server
@orpc/standard-server-aws-lambda
@orpc/standard-server-fetch
@orpc/standard-server-node
@orpc/standard-server-peer
@orpc/svelte-query
@orpc/tanstack-query
@orpc/trpc
@orpc/valibot
@orpc/vue-colada
@orpc/vue-query
@orpc/zod
commit: |
@@ -101,24 +147,10 @@ export class DurableEventIteratorLinkPlugin<T extends ClientContext> implements | |||
} | |||
|
|||
const durableIterator = createClientDurableEventIterator(iterator, link, { | |||
token, | |||
token: initialToken, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We require change createClientDurableEventIterator
to reflect exactly what is current token.
WebSocket: this.WebSocket, | ||
}) | ||
const refetchToken = async (): Promise<string> => { | ||
const fresh = await upstreamNext(snapshot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call interceptorOptions.next()
here no need snapshot
I believe
// Turn the token into a resilient iterator (PartySocket-powered) | ||
options.interceptors.push(async (interceptorOptions) => { | ||
const pluginContext: DurableEventIteratorLinkPluginContext = {} | ||
const output = await interceptorOptions.next({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can group first call .next
inside refetchToken
(maybe rename refetchToken
too)
I've created #965 with a different approach that better addresses this issue. |
Why
With short-lived JWTs, the client would reconnect endlessly using a stale token embedded in the ws URL query param, causing repeated 401s.
What
How
Testing
Notes
ClientRetryPlugin
.Summary by CodeRabbit
New Features
Bug Fixes
Refactor