Add subscription handling to Realtime services across multiple platforms#1427
Add subscription handling to Realtime services across multiple platforms#1427ArnabChatterjee20k merged 8 commits intomasterfrom
Conversation
- Introduced TYPE_RESPONSE constant to handle subscription responses in Realtime services for Android, iOS, Flutter, React Native, and Web. - Implemented sendSubscribeMessage method to manage subscription messages and maintain pending subscription slots. - Updated socket connection logic to reuse existing connections and send updated subscription messages when necessary. - Refactored query parameter construction to streamline the subscription process, removing redundant channel and query handling from the URL. - Enhanced subscription ID mapping to accommodate zero-based server mappings, ensuring accurate subscription management across platforms.
Greptile SummaryThis PR migrates the Realtime subscription mechanism across Android, Apple/Swift, Flutter, Web, and React Native templates from URL query-parameter channel encoding to a WebSocket-level Notable improvements:
The key outstanding item is that all four modified test files ( Confidence Score: 3/5Not safe to merge — all four test files point at staging infrastructure and the PR's own checklist has uncompleted items. The subscription-logic improvements are solid and address previously identified bugs (slot-1 fallback, missing import, stale channels set, Android queue pattern). However, all four modified test files still use the hardcoded staging endpoint explicitly listed as an unchecked TODO in the PR description, which will break CI. Android examples also need regeneration per the checklist. tests/languages/flutter/tests.dart, tests/languages/apple/Tests.swift, tests/languages/android/Tests.kt, tests/languages/web/index.html — all have staging URLs that must be reverted before merging. Important Files Changed
Reviews (6): Last reviewed commit: "updated" | Re-trigger Greptile |
…size and handling it in the backend side for the empty queries
…le platforms - Updated the logic for retrieving known subscription IDs to remove fallback to the previous slot, ensuring only the current slot is considered. - This change was applied consistently across Android, iOS, Flutter, React Native, and Web implementations.
|
@copilot can you check why build is failing in the android and flutter |
| private func sendSubscribeMessage() { | ||
| guard let ws = socketClient, ws.isConnected else { | ||
| return | ||
| } | ||
|
|
||
| var rows = [[String: Any]]() | ||
| pendingSubscribeSlots.removeAll() | ||
|
|
||
| for (slot, subscription) in activeSubscriptions { | ||
| let queries = activeSubscriptionQueries[slot] ?? [] | ||
| var row: [String: Any] = [ | ||
| "channels": Array(subscription.channels), | ||
| "queries": queries | ||
| ] | ||
| if let knownSubscriptionId = slotToSubscriptionId[slot] { | ||
| row["subscriptionId"] = knownSubscriptionId | ||
| } | ||
| rows.append(row) | ||
| pendingSubscribeSlots.append(slot) | ||
| } | ||
|
|
||
| if rows.isEmpty { | ||
| return | ||
| } | ||
|
|
||
| let payload: [String: Any] = [ | ||
| "type": "subscribe", | ||
| "data": rows | ||
| ] | ||
| if let data = try? JSONSerialization.data(withJSONObject: payload), | ||
| let text = String(data: data, encoding: .utf8) { | ||
| ws.send(text: text) | ||
| } | ||
| } |
There was a problem hiding this comment.
sendSubscribeMessage() accesses shared state without synchronization
Unlike the Android template (which guards slot/subscription mutations with subscriptionLock), the Swift sendSubscribeMessage() reads and mutates pendingSubscribeSlots and iterates activeSubscriptions with no synchronization:
pendingSubscribeSlots.removeAll() // no lock
for (slot, subscription) in activeSubscriptions { ... } // no locksendSubscribeMessage() is reachable concurrently from:
handleResponseConnected()— fires on whatever thread the underlyingWebSocketClientuses for callbackssubscribe()— runs in anasync Taskcontext
A concurrent call from the WebSocket callback thread while subscribe() iterates activeSubscriptions creates a data race on Swift's non-thread-safe Dictionary and Array types (undefined behavior). Even without a crash, if two calls interleave, pendingSubscribeSlots from the first call can be overwritten before the server's response arrives, causing handleResponseAction to assign subscription IDs to the wrong slots.
The existing connectSync serial DispatchQueue is the right primitive. Running sendSubscribeMessage() synchronously on it serializes all access to the shared state:
private func sendSubscribeMessage() {
connectSync.sync {
guard let ws = socketClient, ws.isConnected else { return }
var rows = [[String: Any]]()
pendingSubscribeSlots.removeAll()
for (slot, subscription) in activeSubscriptions {
let queries = activeSubscriptionQueries[slot] ?? []
var row: [String: Any] = ["channels": Array(subscription.channels), "queries": queries]
if let sid = slotToSubscriptionId[slot] { row["subscriptionId"] = sid }
rows.append(row)
pendingSubscribeSlots.append(slot)
}
guard !rows.isEmpty,
let data = try? JSONSerialization.data(withJSONObject: ["type": "subscribe", "data": rows]),
let text = String(data: data, encoding: .utf8) else { return }
ws.send(text: text)
}
}
import toJsontoRealtime.kt.twigtemplate