Skip to content

Commit fc6f466

Browse files
committed
Events for sync status
1 parent d5c9923 commit fc6f466

3 files changed

Lines changed: 55 additions & 4 deletions

File tree

crates/y-sweet-core/src/doc_connection.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type Callback = Arc<dyn Fn(&[u8]) + 'static>;
2222
#[cfg(feature = "sync")]
2323
type Callback = Arc<dyn Fn(&[u8]) + 'static + Send + Sync>;
2424

25+
const SYNC_STATUS_MESSAGE: u8 = 102;
26+
2527
pub struct DocConnection {
2628
awareness: Arc<RwLock<Awareness>>,
2729
#[allow(unused)] // acts as RAII guard
@@ -184,6 +186,10 @@ impl DocConnection {
184186
let mut awareness = a.write().unwrap();
185187
protocol.handle_awareness_update(&mut awareness, update)
186188
}
189+
Message::Custom(SYNC_STATUS_MESSAGE, data) => {
190+
// Respond to the client with the same payload it sent.
191+
Ok(Some(Message::Custom(SYNC_STATUS_MESSAGE, data)))
192+
}
187193
Message::Custom(tag, data) => {
188194
let mut awareness = a.write().unwrap();
189195
protocol.missing_handle(&mut awareness, tag, data)

crates/y-sweet/src/server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use futures::{SinkExt, StreamExt};
1717
use serde::Deserialize;
1818
use serde_json::{json, Value};
1919
use std::{
20-
net::SocketAddr,
2120
sync::{Arc, RwLock},
2221
time::Duration,
2322
};

js-pkg/client/src/provider.ts

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ import * as Y from 'yjs'
88
const messageSync = 0
99
const messageQueryAwareness = 3
1010
const messageAwareness = 1
11+
const messageSyncStatus = 102
1112

1213
const EVENT_STATUS = 'status'
14+
15+
/** Fired when the _initial_ sync is complete. Only refired after that if there is a reconnection. */
1316
const EVENT_SYNC = 'sync'
1417
const EVENT_CONNECTION_CLOSE = 'connection-close'
1518
const EVENT_CONNECTION_ERROR = 'connection-error'
19+
/** Fired every time the sync status changes. */
20+
const EVENT_SYNC_STATUS = 'sync-status'
1621

1722
/**
1823
* Note: this should always be a superset of y-websocket's valid events.
@@ -23,6 +28,7 @@ type YSweetEvent =
2328
| typeof EVENT_SYNC
2429
| typeof EVENT_CONNECTION_CLOSE
2530
| typeof EVENT_CONNECTION_ERROR
31+
| typeof EVENT_SYNC_STATUS
2632

2733
const STATUS_CONNECTED = 'connected'
2834
const STATUS_DISCONNECTED = 'disconnected'
@@ -101,12 +107,16 @@ async function getClientToken(authEndpoint: AuthEndpoint, roomname: string): Pro
101107
export class YSweetProvider {
102108
private websocket: WebSocket | null = null
103109
public clientToken: ClientToken | null = null
104-
public synced: boolean = false
110+
private initialSync: boolean = false
111+
private synced: boolean = false
105112
private status: YSweetStatus = { status: STATUS_DISCONNECTED }
106113
public awareness: awarenessProtocol.Awareness
107114
private WebSocketPolyfill: WebSocketPolyfillType
108115
private listeners: Map<YSweetEvent, Set<EventListener>> = new Map()
109116

117+
private lastSyncSent: number = 0
118+
private lastSyncAcked: number = 0
119+
110120
/** Whether we should attempt to connect if we are in a disconnected state. */
111121
private shouldConnect: boolean
112122

@@ -135,6 +145,16 @@ export class YSweetProvider {
135145
doc.on('update', this.update.bind(this))
136146
}
137147

148+
private updateSyncedState() {
149+
if (this.lastSyncAcked === this.lastSyncSent) {
150+
this.synced = true
151+
this.emit(EVENT_SYNC_STATUS, true)
152+
} else {
153+
this.synced = false
154+
this.emit(EVENT_SYNC_STATUS, false)
155+
}
156+
}
157+
138158
private update(update: Uint8Array, origin: YSweetProvider) {
139159
if (!this.websocket) {
140160
console.warn('Websocket not connected')
@@ -146,9 +166,30 @@ export class YSweetProvider {
146166
encoding.writeVarUint(encoder, messageSync)
147167
syncProtocol.writeUpdate(encoder, update)
148168
this.websocket.send(encoding.toUint8Array(encoder))
169+
170+
this.checkSync()
149171
}
150172
}
151173

174+
private checkSync() {
175+
if (!this.websocket) {
176+
console.warn('Websocket not connected')
177+
return
178+
}
179+
180+
this.lastSyncSent += 1
181+
const encoder = encoding.createEncoder()
182+
encoding.writeVarUint(encoder, messageSyncStatus)
183+
184+
const versionEncoder = encoding.createEncoder()
185+
encoding.writeVarUint(versionEncoder, this.lastSyncSent)
186+
187+
encoding.writeVarUint8Array(encoder, encoding.toUint8Array(versionEncoder))
188+
this.websocket.send(encoding.toUint8Array(encoder))
189+
190+
this.updateSyncedState()
191+
}
192+
152193
private async ensureClientToken(): Promise<ClientToken> {
153194
if (this.clientToken) {
154195
return this.clientToken
@@ -319,6 +360,10 @@ export class YSweetProvider {
319360
case messageQueryAwareness:
320361
this.queryAwareness()
321362
break
363+
case messageSyncStatus:
364+
this.lastSyncAcked = decoding.readVarUint(decoder)
365+
this.updateSyncedState()
366+
break
322367
default:
323368
break
324369
}
@@ -356,6 +401,7 @@ export class YSweetProvider {
356401
}
357402

358403
protected emit(eventName: YSweetEvent, data: any = null): void {
404+
console.log('Emitting event', eventName, data)
359405
const listeners = this.listeners.get(eventName)
360406
if (listeners) {
361407
for (const listener of listeners) {
@@ -365,8 +411,8 @@ export class YSweetProvider {
365411
}
366412

367413
private setSynced(state: boolean) {
368-
if (this.synced !== state) {
369-
this.synced = state
414+
if (this.initialSync !== state) {
415+
this.initialSync = state
370416
this.emit('sync', state)
371417
}
372418
}

0 commit comments

Comments
 (0)