Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/y-sweet-core/src/doc_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Callback = Arc<dyn Fn(&[u8]) + 'static>;
#[cfg(feature = "sync")]
type Callback = Arc<dyn Fn(&[u8]) + 'static + Send + Sync>;

const SYNC_STATUS_MESSAGE: u8 = 102;

pub struct DocConnection {
awareness: Arc<RwLock<Awareness>>,
#[allow(unused)] // acts as RAII guard
Expand Down Expand Up @@ -184,6 +186,10 @@ impl DocConnection {
let mut awareness = a.write().unwrap();
protocol.handle_awareness_update(&mut awareness, update)
}
Message::Custom(SYNC_STATUS_MESSAGE, data) => {
// Respond to the client with the same payload it sent.
Ok(Some(Message::Custom(SYNC_STATUS_MESSAGE, data)))
}
Message::Custom(tag, data) => {
let mut awareness = a.write().unwrap();
protocol.missing_handle(&mut awareness, tag, data)
Expand Down
1 change: 0 additions & 1 deletion crates/y-sweet/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use futures::{SinkExt, StreamExt};
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
net::SocketAddr,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change that snuck in, but it's just removing an unnecessary import

sync::{Arc, RwLock},
time::Duration,
};
Expand Down
57 changes: 51 additions & 6 deletions js-pkg/client/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import * as Y from 'yjs'
const MESSAGE_SYNC = 0
const MESSAGE_QUERY_AWARENESS = 3
const MESSAGE_AWARENESS = 1
const MESSAGE_SYNC_STATUS = 102

const EVENT_STATUS = 'status'

/** Fired when the _initial_ sync is complete. Only refired after that if there is a reconnection. */
const EVENT_SYNC = 'sync'
const EVENT_CONNECTION_CLOSE = 'connection-close'
const EVENT_CONNECTION_ERROR = 'connection-error'
/** Fired every time the sync status changes. */
const EVENT_SYNC_STATUS = 'sync-status'

/**
* Note: this should always be a superset of y-websocket's valid events.
Expand All @@ -23,6 +28,7 @@ type YSweetEvent =
| typeof EVENT_SYNC
| typeof EVENT_CONNECTION_CLOSE
| typeof EVENT_CONNECTION_ERROR
| typeof EVENT_SYNC_STATUS

const STATUS_CONNECTED = 'connected'
const STATUS_DISCONNECTED = 'disconnected'
Expand Down Expand Up @@ -107,6 +113,9 @@ export class YSweetProvider {
private WebSocketPolyfill: WebSocketPolyfillType
private listeners: Map<YSweetEvent, Set<EventListener>> = new Map()

private lastSyncSent: number = 0
private lastSyncAcked: number = 0

/** Whether we should attempt to connect if we are in a disconnected state. */
private shouldConnect: boolean

Expand Down Expand Up @@ -136,6 +145,16 @@ export class YSweetProvider {
}
}

private updateSyncedState() {
if (this.lastSyncAcked === this.lastSyncSent) {
this.synced = true
this.emit(EVENT_SYNC_STATUS, true)
} else {
this.synced = false
this.emit(EVENT_SYNC_STATUS, false)
}
}

private update(update: Uint8Array, origin: YSweetProvider) {
if (!this.websocket) {
console.warn('Websocket not connected')
Expand All @@ -147,7 +166,28 @@ export class YSweetProvider {
encoding.writeVarUint(encoder, MESSAGE_SYNC)
syncProtocol.writeUpdate(encoder, update)
this.websocket.send(encoding.toUint8Array(encoder))

this.checkSync()
}
}

private checkSync() {
if (!this.websocket) {
console.warn('Websocket not connected')
return
}

this.lastSyncSent += 1
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, MESSAGE_SYNC_STATUS)

const versionEncoder = encoding.createEncoder()
encoding.writeVarUint(versionEncoder, this.lastSyncSent)

encoding.writeVarUint8Array(encoder, encoding.toUint8Array(versionEncoder))
this.websocket.send(encoding.toUint8Array(encoder))

this.updateSyncedState()
Comment thread
paulgb marked this conversation as resolved.
}

private async ensureClientToken(): Promise<ClientToken> {
Expand Down Expand Up @@ -332,13 +372,17 @@ export class YSweetProvider {
case MESSAGE_QUERY_AWARENESS:
this.queryAwareness()
break
case MESSAGE_SYNC_STATUS:
this.lastSyncAcked = decoding.readVarUint(decoder)
this.updateSyncedState()
break
default:
break
}
}

private websocketClose(event: CloseEvent) {
this.emit('connection-close', event)
this.emit(EVENT_CONNECTION_CLOSE, event)
this.setSynced(false)
this.setStatus({ status: STATUS_DISCONNECTED })

Expand All @@ -365,17 +409,18 @@ export class YSweetProvider {
}

protected emit(eventName: YSweetEvent, data: any = null): void {
const listeners = this.listeners.get(eventName) || new Set()

for (const listener of listeners) {
listener(data)
const listeners = this.listeners.get(eventName)
if (listeners) {
for (const listener of listeners) {
listener(data)
}
}
}

private setSynced(state: boolean) {
if (this.synced !== state) {
this.synced = state
this.emit('sync', state)
this.emit(EVENT_SYNC, state)
}
}

Expand Down