diff --git a/examples/quickstart-chat/src/App.tsx b/examples/quickstart-chat/src/App.tsx index d6b229d7..0ba4ce9a 100644 --- a/examples/quickstart-chat/src/App.tsx +++ b/examples/quickstart-chat/src/App.tsx @@ -1,6 +1,12 @@ import React, { useEffect, useState } from 'react'; import './App.css'; -import { DBConnection, EventContext, Message, User } from './module_bindings'; +import { + DBConnection, + ErrorContext, + EventContext, + Message, + User, +} from './module_bindings'; import { Identity } from '@clockworklabs/spacetimedb-sdk'; export type PrettyMessage = { @@ -85,6 +91,21 @@ function App() { const [conn, setConn] = useState(null); useEffect(() => { + const subscribeToQueries = (conn: DBConnection, queries: string[]) => { + let count = 0; + for (const query of queries) { + conn + ?.subscriptionBuilder() + .onApplied(() => { + count++; + if (count === queries.length) { + console.log('SDK client cache initialized.'); + } + }) + .subscribe(query); + } + }; + const onConnect = ( conn: DBConnection, identity: Identity, @@ -100,12 +121,8 @@ function App() { conn.reducers.onSendMessage(() => { console.log('Message sent.'); }); - conn - .subscriptionBuilder() - .onApplied(() => { - console.log('SDK client cache initialized.'); - }) - .subscribe(['SELECT * FROM message', 'SELECT * FROM user']); + + subscribeToQueries(conn, ['SELECT * FROM message', 'SELECT * FROM user']); }; const onDisconnect = () => { @@ -113,7 +130,7 @@ function App() { setConnected(false); }; - const onConnectError = (_conn: DBConnection, err: Error) => { + const onConnectError = (_ctx: ErrorContext, err: Error) => { console.log('Error connecting to SpacetimeDB:', err); }; diff --git a/examples/quickstart-chat/src/module_bindings/index.ts b/examples/quickstart-chat/src/module_bindings/index.ts index 59a762c3..b582de95 100644 --- a/examples/quickstart-chat/src/module_bindings/index.ts +++ b/examples/quickstart-chat/src/module_bindings/index.ts @@ -224,11 +224,16 @@ export class DBConnection extends DBConnectionImpl< RemoteReducers, SetReducerFlags > { - static builder = (): DBConnectionBuilder => { - return new DBConnectionBuilder( - REMOTE_MODULE, - (imp: DBConnectionImpl) => imp as DBConnection - ); + static builder = (): DBConnectionBuilder< + DBConnection, + ErrorContext, + SubscriptionEventContex + > => { + return new DBConnectionBuilder< + DBConnection, + ErrorContext, + SubscriptionEventContex + >(REMOTE_MODULE, (imp: DBConnectionImpl) => imp as DBConnection); }; subscriptionBuilder = (): SubscriptionBuilder => { return new SubscriptionBuilder(this); diff --git a/packages/sdk/README.md b/packages/sdk/README.md index 8650abaa..15a39a5c 100644 --- a/packages/sdk/README.md +++ b/packages/sdk/README.md @@ -40,7 +40,7 @@ const connection = DBConnection.builder() identity.toHexString() ); - connection.subscriptionBuilder().subscribe(['SELECT * FROM player']); + connection.subscriptionBuilder().subscribe('SELECT * FROM player'); }) .withToken('TOKEN') .build(); diff --git a/packages/sdk/src/algebraic_type.ts b/packages/sdk/src/algebraic_type.ts index d18115bb..f93a487d 100644 --- a/packages/sdk/src/algebraic_type.ts +++ b/packages/sdk/src/algebraic_type.ts @@ -1,3 +1,4 @@ +import { TimeDuration, Timestamp } from '.'; import { ConnectionId } from './connection_id'; import type BinaryReader from './binary_reader'; import type BinaryWriter from './binary_writer'; @@ -162,9 +163,17 @@ export class ProductType { } }; - deserialize = (reader: BinaryReader): object => { + deserialize = (reader: BinaryReader): any => { let result: { [key: string]: any } = {}; if (this.elements.length === 1) { + if (this.elements[0].name === '__time_duration_micros__') { + return new TimeDuration(reader.readI64()); + } + + if (this.elements[0].name === '__timestamp_micros_since_unix_epoch__') { + return new Timestamp(reader.readI64()); + } + if (this.elements[0].name === '__identity__') { return new Identity(reader.readU256()); } @@ -347,28 +356,29 @@ export class AlgebraicType { new ProductTypeElement('__identity__', this.createU256Type()), ]); } + static createConnectionIdType(): AlgebraicType { return this.createProductType([ new ProductTypeElement('__connection_id__', this.createU128Type()), ]); } + static createScheduleAtType(): AlgebraicType { return ScheduleAt.getAlgebraicType(); } + static createTimestampType(): AlgebraicType { - return AlgebraicType.createProductType([ + return this.createProductType([ new ProductTypeElement( '__timestamp_micros_since_unix_epoch__', - AlgebraicType.createI64Type() + this.createI64Type() ), ]); } + static createTimeDurationType(): AlgebraicType { - return AlgebraicType.createProductType([ - new ProductTypeElement( - '__time_duration_micros__', - AlgebraicType.createI64Type() - ), + return this.createProductType([ + new ProductTypeElement('__time_duration_micros__', this.createI64Type()), ]); } diff --git a/packages/sdk/src/client_api/index.ts b/packages/sdk/src/client_api/index.ts index 94411da5..17026556 100644 --- a/packages/sdk/src/client_api/index.ts +++ b/packages/sdk/src/client_api/index.ts @@ -89,7 +89,7 @@ export { Unsubscribe }; import { UnsubscribeApplied } from './unsubscribe_applied_type.ts'; export { UnsubscribeApplied }; import { UpdateStatus } from './update_status_type.ts'; -export { UpdateStatus }; +export { UpdateStatus, Timestamp }; const REMOTE_MODULE = { tables: {}, diff --git a/packages/sdk/src/db_connection_builder.ts b/packages/sdk/src/db_connection_builder.ts index 73e1c393..13b7ab54 100644 --- a/packages/sdk/src/db_connection_builder.ts +++ b/packages/sdk/src/db_connection_builder.ts @@ -1,22 +1,25 @@ -import { DBConnectionImpl } from './db_connection_impl'; +import { DBConnectionImpl, type ConnectionEvent } from './db_connection_impl'; import { EventEmitter } from './event_emitter'; import type { Identity } from './identity'; -import { stdbLogger } from './logger'; -import type SpacetimeModule from './spacetime_module'; +import type RemoteModule from './spacetime_module'; import { WebsocketDecompressAdapter } from './websocket_decompress_adapter'; /** * The database client connection to a SpacetimeDB server. */ -export class DBConnectionBuilder { +export class DBConnectionBuilder< + DBConnection, + ErrorContext, + SubscriptionEventContext, +> { #uri?: URL; #nameOrAddress?: string; #identity?: Identity; #token?: string; - #emitter: EventEmitter = new EventEmitter(); - #createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; + #emitter: EventEmitter = new EventEmitter(); #compression: 'gzip' | 'none' = 'gzip'; - #light_mode: boolean = false; + #lightMode: boolean = false; + #createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; /** * Creates a new `SpacetimeDBClient` database client and set the initial parameters. @@ -36,23 +39,23 @@ export class DBConnectionBuilder { * ``` */ constructor( - private spacetimeModule: SpacetimeModule, + private remoteModule: RemoteModule, private dbConnectionConstructor: (imp: DBConnectionImpl) => DBConnection ) { this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn; } - withUri(uri: string | URL): DBConnectionBuilder { + withUri(uri: string | URL): this { this.#uri = new URL(uri); return this; } - withModuleName(nameOrAddress: string): DBConnectionBuilder { + withModuleName(nameOrAddress: string): this { this.#nameOrAddress = nameOrAddress; return this; } - withToken(token?: string): DBConnectionBuilder { + withToken(token?: string): this { this.#token = token; return this; } @@ -63,99 +66,21 @@ export class DBConnectionBuilder { wsProtocol: string; authToken?: string; }) => Promise - ): DBConnectionBuilder { + ): this { this.#createWSFn = createWSFn; return this; } - withCompression( - compression: 'gzip' | 'none' - ): DBConnectionBuilder { + withCompression(compression: 'gzip' | 'none'): this { this.#compression = compression; return this; } - withLightMode(light_mode: boolean): DBConnectionBuilder { - this.#light_mode = light_mode; + withLightMode(lightMode: boolean): this { + this.#lightMode = lightMode; return this; } - /** - * Connect to The SpacetimeDB Websocket For Your Module. By default, this will use a secure websocket connection. The parameters are optional, and if not provided, will use the values provided on construction of the client. - * - * @param host The hostname of the SpacetimeDB server. Defaults to the value passed to the `constructor`. - * @param nameOrAddress The name or address of the SpacetimeDB module. Defaults to the value passed to the `constructor`. - * @param authToken The credentials to use to authenticate with SpacetimeDB. Defaults to the value passed to the `constructor`. - * - * @example - * - * ```ts - * const host = "ws://localhost:3000"; - * const name_or_address = "database_name" - * const auth_token = undefined; - * - * var spacetimeDBClient = new SpacetimeDBClient(host, name_or_address, auth_token); - * // Connect with the initial parameters - * spacetimeDBClient.connect(); - * //Set the `auth_token` - * spacetimeDBClient.connect(undefined, undefined, NEW_TOKEN); - * ``` - */ - build(): DBConnection { - stdbLogger('info', 'Connecting to SpacetimeDB WS...'); - - let url = new URL(`database/subscribe/${this.#nameOrAddress}`, this.#uri); - - if (!this.#uri) { - throw new Error('URI is required to connect to SpacetimeDB'); - } - - if (!/^wss?:/.test(this.#uri.protocol)) { - url.protocol = 'ws:'; - } - - const connection = new DBConnectionImpl( - this.spacetimeModule, - this.#emitter - ); - connection.identity = this.#identity; - connection.token = this.#token; - - let connectionId = connection.connectionId.toHexString(); - url.searchParams.set('connection_id', connectionId); - - connection.wsPromise = this.#createWSFn({ - url, - wsProtocol: 'v1.bsatn.spacetimedb', - authToken: connection.token, - compression: this.#compression, - light_mode: this.#light_mode, - }) - .then(v => { - connection.ws = v; - - connection.ws.onclose = () => { - this.#emitter.emit('disconnect', connection); - }; - connection.ws.onerror = (e: ErrorEvent) => { - this.#emitter.emit('connectError', connection, e); - }; - connection.ws.onopen = connection.handleOnOpen.bind(connection); - connection.ws.onmessage = connection.handleOnMessage.bind(connection); - - return v; - }) - .catch(e => { - stdbLogger('error', 'Error connecting to SpacetimeDB WS'); - connection.on('connectError', e); - // TODO(cloutiertyler): I don't know but this makes it compile and - // I don't have time to investigate how to do this properly. - throw e; - }); - - return this.dbConnectionConstructor(connection); - } - /** * Register a callback to be invoked upon authentication with the database. * @@ -182,11 +107,11 @@ export class DBConnectionBuilder { */ onConnect( callback: ( - connection: DBConnection, + ctx: SubscriptionEventContext, identity: Identity, token: string ) => void - ): DBConnectionBuilder { + ): this { this.#emitter.on('connect', callback); return this; } @@ -202,9 +127,7 @@ export class DBConnectionBuilder { * }); * ``` */ - onConnectError( - callback: (connection: DBConnection, error: Error) => void - ): DBConnectionBuilder { + onConnectError(callback: (ctx: ErrorContext, error: Error) => void): this { this.#emitter.on('connectError', callback); return this; } @@ -236,9 +159,56 @@ export class DBConnectionBuilder { * @throws {Error} Throws an error if called multiple times on the same `DbConnectionBuilder`. */ onDisconnect( - callback: (connection: DBConnection, error?: Error | undefined) => void - ): DBConnectionBuilder { + callback: (ctx: ErrorContext, error?: Error | undefined) => void + ): this { this.#emitter.on('disconnect', callback); return this; } + + /** + * Connect to The SpacetimeDB Websocket For Your Module. By default, this will use a secure websocket connection. The parameters are optional, and if not provided, will use the values provided on construction of the client. + * + * @param host The hostname of the SpacetimeDB server. Defaults to the value passed to the `constructor`. + * @param nameOrAddress The name or address of the SpacetimeDB module. Defaults to the value passed to the `constructor`. + * @param authToken The credentials to use to authenticate with SpacetimeDB. Defaults to the value passed to the `constructor`. + * + * @example + * + * ```ts + * const host = "ws://localhost:3000"; + * const name_or_address = "database_name" + * const auth_token = undefined; + * + * var spacetimeDBClient = new SpacetimeDBClient(host, name_or_address, auth_token); + * // Connect with the initial parameters + * spacetimeDBClient.connect(); + * //Set the `auth_token` + * spacetimeDBClient.connect(undefined, undefined, NEW_TOKEN); + * ``` + */ + build(): DBConnection { + if (!this.#uri) { + throw new Error('URI is required to connect to SpacetimeDB'); + } + + if (!this.#nameOrAddress) { + throw new Error( + 'Database name or address is required to connect to SpacetimeDB' + ); + } + + return this.dbConnectionConstructor( + new DBConnectionImpl({ + uri: this.#uri, + nameOrAddress: this.#nameOrAddress, + identity: this.#identity, + token: this.#token, + emitter: this.#emitter, + compression: this.#compression, + lightMode: this.#lightMode, + createWSFn: this.#createWSFn, + remoteModule: this.remoteModule, + }) + ); + } } diff --git a/packages/sdk/src/db_connection_impl.ts b/packages/sdk/src/db_connection_impl.ts index a2a47a16..316c29b0 100644 --- a/packages/sdk/src/db_connection_impl.ts +++ b/packages/sdk/src/db_connection_impl.ts @@ -29,14 +29,25 @@ import { import { EventEmitter } from './event_emitter.ts'; import { decompress } from './decompress.ts'; import type { Identity } from './identity.ts'; -import type { IdentityTokenMessage, Message } from './message_types.ts'; +import type { + IdentityTokenMessage, + Message, + SubscribeAppliedMessage, + UnsubscribeAppliedMessage, +} from './message_types.ts'; import type { ReducerEvent } from './reducer_event.ts'; -import type SpacetimeModule from './spacetime_module.ts'; +import type RemoteModule from './spacetime_module.ts'; import { TableCache, type Operation, type TableUpdate } from './table_cache.ts'; import { deepEqual, toPascalCase } from './utils.ts'; import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts'; import type { WebsocketTestAdapter } from './websocket_test_adapter.ts'; -import { SubscriptionBuilderImpl } from './subscription_builder_impl.ts'; +import { + SubscriptionBuilderImpl, + SubscriptionHandleImpl, + SubscriptionManager, + type SubscribeEvent, +} from './subscription_builder_impl.ts'; +import { stdbLogger } from './logger.ts'; import type { ReducerRuntimeTypeInfo } from './spacetime_module.ts'; export { @@ -68,7 +79,6 @@ export type { }; export type ConnectionEvent = 'connect' | 'disconnect' | 'connectError'; - export type CallReducerFlags = 'FullUpdate' | 'NoSuccessNotify'; type ReducerEventCallback = ( @@ -89,73 +99,202 @@ function callReducerFlagsToNumber(flags: CallReducerFlags): number { } } +type DBConnectionConfig = { + uri: URL; + nameOrAddress: string; + identity?: Identity; + token?: string; + emitter: EventEmitter; + remoteModule: RemoteModule; + createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; + compression: 'gzip' | 'none'; + lightMode: boolean; +}; + export class DBConnectionImpl< DBView = any, Reducers = any, SetReducerFlags = any, > implements DBContext { + /** + * Whether or not the connection is active. + */ isActive = false; + /** - * The user's public identity. + * This connection's public identity. */ identity?: Identity = undefined; + /** - * The user's private authentication token. + * This connection's private authentication token. */ token?: string = undefined; /** - * Reference to the database of the client. + * The accessor field to access the tables in the database and associated + * callback functions. */ - clientCache: ClientCache; - remoteModule: SpacetimeModule; - #emitter: EventEmitter; - #reducerEmitter: EventEmitter = new EventEmitter(); - #onApplied?: SubscriptionEventCallback; - - wsPromise!: Promise; - ws?: WebsocketDecompressAdapter | WebsocketTestAdapter; db: DBView; + + /** + * The accessor field to access the reducers in the database and associated + * callback functions. + */ reducers: Reducers; + + /** + * The accessor field to access functions related to setting flags on + * reducers regarding how the server should handle the reducer call and + * the events that it sends back to the client. + */ setReducerFlags: SetReducerFlags; + /** + * The `ConnectionId` of the connection to to the database. + */ connectionId: ConnectionId = ConnectionId.random(); + // These fields are meant to be strictly private. + #queryId = 0; + #emitter: EventEmitter; + #reducerEmitter: EventEmitter = + new EventEmitter(); + #onApplied?: SubscriptionEventCallback; + #remoteModule: RemoteModule; #messageQueue = Promise.resolve(); + #subscriptionManager = new SubscriptionManager(); + + // These fields are not part of the public API, but in a pinch you + // could use JavaScript to access them by bypassing TypeScript's + // private fields. + // We use them in testing. + private clientCache: ClientCache; + private ws?: WebsocketDecompressAdapter | WebsocketTestAdapter; + private wsPromise: Promise; + + constructor({ + uri, + nameOrAddress, + identity, + token, + emitter, + remoteModule, + createWSFn, + compression, + lightMode, + }: DBConnectionConfig) { + stdbLogger('info', 'Connecting to SpacetimeDB WS...'); + + let url = new URL(`database/subscribe/${nameOrAddress}`, uri); + + if (!/^wss?:/.test(uri.protocol)) { + url.protocol = 'ws:'; + } - constructor(remoteModule: SpacetimeModule, emitter: EventEmitter) { - this.clientCache = new ClientCache(); + this.identity = identity; + this.token = token; + + this.#remoteModule = remoteModule; this.#emitter = emitter; - this.remoteModule = remoteModule; - this.db = this.remoteModule.dbViewConstructor(this); - this.setReducerFlags = this.remoteModule.setReducerFlagsConstructor(); - this.reducers = this.remoteModule.reducersConstructor( + + let connectionId = this.connectionId.toHexString(); + url.searchParams.set('connection_id', connectionId); + + this.clientCache = new ClientCache(); + this.db = this.#remoteModule.dbViewConstructor(this); + this.setReducerFlags = this.#remoteModule.setReducerFlagsConstructor(); + this.reducers = this.#remoteModule.reducersConstructor( this, this.setReducerFlags ); + + this.wsPromise = createWSFn({ + url, + wsProtocol: 'v1.bsatn.spacetimedb', + authToken: token, + compression: compression, + lightMode: lightMode, + }) + .then(v => { + this.ws = v; + + this.ws.onclose = () => { + this.#emitter.emit('disconnect', this); + }; + this.ws.onerror = (e: ErrorEvent) => { + this.#emitter.emit('connectError', this, e); + }; + this.ws.onopen = this.#handleOnOpen.bind(this); + this.ws.onmessage = this.#handleOnMessage.bind(this); + + return v; + }) + .catch(e => { + stdbLogger('error', 'Error connecting to SpacetimeDB WS'); + this.#on('connectError', e); + // TODO(cloutiertyler): I don't know but this makes it compile and + // I don't have time to investigate how to do this properly. + // Otherwise `.catch` returns void. + throw e; + }); } - /** - * Close the current connection. - * - * @example - * - * ```ts - * const connection = DBConnection.builder().build(); - * connection.disconnect() - * ``` - */ - disconnect(): void { - this.wsPromise.then(wsResolved => { - wsResolved.close(); + #getNextQueryId = () => { + const queryId = this.#queryId; + this.#queryId += 1; + return queryId; + }; + + // NOTE: This is very important!!! This is the actual function that + // gets called when you call `connection.subscriptionBuilder()`. + // The `subscriptionBuilder` function which is generated, just shadows + // this function in the type system, but not the actual implementation! + // Do not remove this function, or shoot yourself in the foot please. + // It's not clear what would be a better way to do this at this exact + // moment. + subscriptionBuilder = (): SubscriptionBuilderImpl => { + return new SubscriptionBuilderImpl(this); + }; + + registerSubscription( + handle: SubscriptionHandleImpl, + handleEmitter: EventEmitter, + querySql: string + ): number { + const queryId = this.#getNextQueryId(); + this.#subscriptionManager.subscriptions.set(queryId, { + handle, + emitter: handleEmitter, }); + this.#sendMessage( + ws.ClientMessage.SubscribeSingle({ + query: querySql, + queryId: { id: queryId }, + // The TypeScript SDK doesn't currently track `request_id`s, + // so always use 0. + requestId: 0, + }) + ); + return queryId; } + unregisterSubscription(queryId: number): void { + this.#sendMessage( + ws.ClientMessage.Unsubscribe({ + queryId: { id: queryId }, + // The TypeScript SDK doesn't currently track `request_id`s, + // so always use 0. + requestId: 0, + }) + ); + } + + // This function is async because we decompress the message async async #processParsedMessage( - message: ws.ServerMessage, - callback: (message: Message) => void - ) { + message: ws.ServerMessage + ): Promise { const parseRowList = ( type: 'insert' | 'delete', tableName: string, @@ -167,7 +306,7 @@ export class DBConnectionImpl< const endingOffset = offset + length; const reader = new BinaryReader(buffer); const rows: any[] = []; - const rowType = this.remoteModule.tables[tableName]!.rowType; + const rowType = this.#remoteModule.tables[tableName]!.rowType; while (offset < endingOffset) { const row = rowType.deserialize(reader); const rowId = new TextDecoder('utf-8').decode(buffer); @@ -180,6 +319,7 @@ export class DBConnectionImpl< } return rows; }; + const parseTableUpdate = async ( rawTableUpdate: ws.TableUpdate ): Promise => { @@ -211,6 +351,7 @@ export class DBConnectionImpl< operations, }; }; + const parseDatabaseUpdate = async ( dbUpdate: ws.DatabaseUpdate ): Promise => { @@ -229,8 +370,7 @@ export class DBConnectionImpl< tag: 'InitialSubscription', tableUpdates, }; - callback(subscriptionUpdate); - break; + return subscriptionUpdate; } case 'TransactionUpdateLight': { @@ -240,8 +380,7 @@ export class DBConnectionImpl< tag: 'TransactionUpdateLight', tableUpdates, }; - callback(subscriptionUpdate); - break; + return subscriptionUpdate; } case 'TransactionUpdate': { @@ -275,7 +414,7 @@ export class DBConnectionImpl< if (originalReducerName === '') { let errorMessage = errMessage; console.error(`Received an error from the database: ${errorMessage}`); - break; + return; } let reducerInfo: @@ -304,8 +443,7 @@ export class DBConnectionImpl< message: errMessage, timestamp: txUpdate.timestamp, }; - callback(transactionUpdate); - break; + return transactionUpdate; } case 'IdentityToken': { @@ -315,8 +453,7 @@ export class DBConnectionImpl< token: message.value.token, connectionId: message.value.connectionId, }; - callback(identityTokenMessage); - break; + return identityTokenMessage; } case 'OneOffQueryResponse': { @@ -324,52 +461,42 @@ export class DBConnectionImpl< `TypeScript SDK never sends one-off queries, but got OneOffQueryResponse ${message}` ); } - } - } - async processMessage( - data: Uint8Array, - callback: (message: Message) => void - ): Promise { - const message = parseValue(ws.ServerMessage, data); - await this.#processParsedMessage(message, callback); - } + case 'SubscribeApplied': { + const parsedTableUpdate = await parseTableUpdate( + message.value.rows.tableRows + ); + const subscribeAppliedMessage: SubscribeAppliedMessage = { + tag: 'SubscribeApplied', + queryId: message.value.queryId.id, + tableUpdate: parsedTableUpdate, + }; + return subscribeAppliedMessage; + } - /** - * Subscribe to a set of queries, to be notified when rows which match those queries are altered. - * - * NOTE: A new call to `subscribe` will remove all previous subscriptions and replace them with the new `queries`. - * - * If any rows matched the previous subscribed queries but do not match the new queries, - * those rows will be removed from the client cache, and `{Table}.on_delete` callbacks will be invoked for them. - * - * @param queries A `SQL` query or list of queries. - * - * @example - * - * ```ts - * spacetimeDBClient.subscribe(["SELECT * FROM User","SELECT * FROM Message"]); - * ``` - */ - // This is marked private but not # because we need to use it from the builder - private subscribe( - queryOrQueries: string | string[], - onApplied?: SubscriptionEventCallback, - _onError?: ErrorCallback - ): void { - this.#onApplied = onApplied; - const queries = - typeof queryOrQueries === 'string' ? [queryOrQueries] : queryOrQueries; - const message = ws.ClientMessage.Subscribe({ - queryStrings: queries, - // The TypeScript SDK doesn't currently track `request_id`s, - // so always use 0. - requestId: 0, - }); - this.#sendMessage(message); + case 'UnsubscribeApplied': { + const parsedTableUpdate = await parseTableUpdate( + message.value.rows.tableRows + ); + const unsubscribeAppliedMessage: UnsubscribeAppliedMessage = { + tag: 'UnsubscribeApplied', + queryId: message.value.queryId.id, + tableUpdate: parsedTableUpdate, + }; + return unsubscribeAppliedMessage; + } + + case 'SubscriptionError': { + return { + tag: 'SubscriptionError', + queryId: message.value.queryId, + error: message.value.error, + }; + } + } } - #sendMessage(message: ws.ClientMessage) { + #sendMessage(message: ws.ClientMessage): void { this.wsPromise.then(wsResolved => { const writer = new BinaryWriter(1024); ws.ClientMessage.serialize(writer, message); @@ -379,31 +506,9 @@ export class DBConnectionImpl< } /** - * Call a reducer on your SpacetimeDB module. - * - * @param reducerName The name of the reducer to call - * @param argsSerializer The arguments to pass to the reducer - */ - callReducer( - reducerName: string, - argsBuffer: Uint8Array, - flags: CallReducerFlags - ): void { - const message = ws.ClientMessage.CallReducer({ - reducer: reducerName, - args: argsBuffer, - // The TypeScript SDK doesn't currently track `request_id`s, - // so always use 0. - requestId: 0, - flags: callReducerFlagsToNumber(flags), - }); - this.#sendMessage(message); - } - - /**s * Handles WebSocket onOpen event. */ - handleOnOpen(): void { + #handleOnOpen(): void { this.isActive = true; } @@ -414,182 +519,289 @@ export class DBConnectionImpl< for (let tableUpdate of tableUpdates) { // Get table information for the table being updated const tableName = tableUpdate.tableName; - const tableTypeInfo = this.remoteModule.tables[tableName]!; + const tableTypeInfo = this.#remoteModule.tables[tableName]!; const table = this.clientCache.getOrCreateTable(tableTypeInfo); table.applyOperations(tableUpdate.operations, eventContext); } } - /** - * Handles WebSocket onMessage event. - * @param wsMessage MessageEvent object. - */ - handleOnMessage(wsMessage: { data: Uint8Array }): void { - this.#messageQueue = this.#messageQueue.then(() => - this.processMessage(wsMessage.data, message => { - if (message.tag === 'InitialSubscription') { - let event: Event = { tag: 'SubscribeApplied' }; + async #processMessage(data: Uint8Array): Promise { + const serverMessage = parseValue(ws.ServerMessage, data); + const message = await this.#processParsedMessage(serverMessage); + if (!message) { + return; + } + switch (message.tag) { + case 'InitialSubscription': { + let event: Event = { tag: 'SubscribeApplied' }; - const eventContext = this.remoteModule.eventContextConstructor( - this, - event - ); - // Remove the event from the subscription event context - // It is not a field in the type narrowed SubscriptionEventContext - const { event: _, ...subscriptionEventContext } = eventContext; - this.#applyTableUpdates(message.tableUpdates, eventContext); + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + // Remove the event from the subscription event context + // It is not a field in the type narrowed SubscriptionEventContext + const { event: _, ...subscriptionEventContext } = eventContext; + this.#applyTableUpdates(message.tableUpdates, eventContext); - if (this.#emitter) { - this.#onApplied?.(subscriptionEventContext); + if (this.#emitter) { + this.#onApplied?.(subscriptionEventContext); + } + break; + } + case 'TransactionUpdateLight': { + let event: Event = { tag: 'UnknownTransaction' }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + this.#applyTableUpdates(message.tableUpdates, eventContext); + break; + } + case 'TransactionUpdate': { + let reducerInfo = message.reducerInfo; + let unknownTransaction = false; + let reducerArgs: any | undefined; + let reducerTypeInfo: ReducerRuntimeTypeInfo | undefined; + if (!reducerInfo) { + unknownTransaction = true; + } else { + const reducerTypeInfo = + this.#remoteModule.reducers[reducerInfo.originalReducerName]; + try { + const reader = new BinaryReader(reducerInfo.args as Uint8Array); + reducerArgs = reducerTypeInfo.argsType.deserialize(reader); + } catch { + // This should only be printed in development, since it's + // possible for clients to receive new reducers that they don't + // know about. + console.debug('Failed to deserialize reducer arguments'); + unknownTransaction = true; } - } else if (message.tag === 'TransactionUpdateLight') { + } + + if (unknownTransaction) { const event: Event = { tag: 'UnknownTransaction' }; - const eventContext = this.remoteModule.eventContextConstructor( + const eventContext = this.#remoteModule.eventContextConstructor( this, event ); this.#applyTableUpdates(message.tableUpdates, eventContext); - } else if (message.tag === 'TransactionUpdate') { - let reducerInfo = message.reducerInfo; - let unknownTransaction = false; - let reducerArgs: any | undefined; - let reducerTypeInfo: ReducerRuntimeTypeInfo | undefined; - if (!reducerInfo) { - unknownTransaction = true; - } else { - const reducerTypeInfo = - this.remoteModule.reducers[reducerInfo.originalReducerName]; - try { - const reader = new BinaryReader(reducerInfo.args as Uint8Array); - reducerArgs = reducerTypeInfo.argsType.deserialize(reader); - } catch { - // This should only be printed in development, since it's - // possible for clients to receive new reducers that they don't - // know about. - console.debug('Failed to deserialize reducer arguments'); - unknownTransaction = true; - } - } - - if (unknownTransaction) { - const event: Event = { tag: 'UnknownTransaction' }; - const eventContext = this.remoteModule.eventContextConstructor( - this, - event - ); - this.#applyTableUpdates(message.tableUpdates, eventContext); - return; - } + return; + } - // At this point, we know that `reducerInfo` is not null because - // we return if `unknownTransaction` is true. - reducerInfo = reducerInfo!; - reducerTypeInfo = reducerTypeInfo!; - - // Thus this must be a reducer event create it and emit it. - const reducerEvent = { - callerIdentity: message.identity, - status: message.status, - callerConnectionId: message.connectionId as ConnectionId, - timestamp: message.timestamp, - energyConsumed: message.energyConsumed, - reducer: { - name: reducerInfo.reducerName, - args: reducerArgs, - }, - }; - const event: Event = { - tag: 'Reducer', - value: reducerEvent, - }; - const eventContext = this.remoteModule.eventContextConstructor( - this, - event - ); - const reducerEventContext = { - ...eventContext, - event: reducerEvent, - }; + // At this point, we know that `reducerInfo` is not null because + // we return if `unknownTransaction` is true. + reducerInfo = reducerInfo!; + reducerTypeInfo = reducerTypeInfo!; + + // Thus this must be a reducer event create it and emit it. + const reducerEvent = { + callerIdentity: message.identity, + status: message.status, + callerConnectionId: message.connectionId as ConnectionId, + timestamp: message.timestamp, + energyConsumed: message.energyConsumed, + reducer: { + name: reducerInfo.reducerName, + args: reducerArgs, + }, + }; + const event: Event = { + tag: 'Reducer', + value: reducerEvent, + }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const reducerEventContext = { + ...eventContext, + event: reducerEvent, + }; - this.#applyTableUpdates(message.tableUpdates, eventContext); + this.#applyTableUpdates(message.tableUpdates, eventContext); - const argsArray: any[] = []; - reducerTypeInfo.argsType.product.elements.forEach( - (element, index) => { - argsArray.push(reducerArgs[element.name]); - } - ); - this.#reducerEmitter.emit( - reducerInfo.reducerName, - reducerEventContext, - ...argsArray - ); - } else if (message.tag === 'IdentityToken') { - this.identity = message.identity; - if (!this.token && message.token) { - this.token = message.token; - } - this.connectionId = message.connectionId; - this.#emitter.emit('connect', this, this.identity, this.token); + const argsArray: any[] = []; + reducerTypeInfo.argsType.product.elements.forEach((element, index) => { + argsArray.push(reducerArgs[element.name]); + }); + this.#reducerEmitter.emit( + reducerInfo.reducerName, + reducerEventContext, + ...argsArray + ); + break; + } + case 'IdentityToken': { + this.identity = message.identity; + if (!this.token && message.token) { + this.token = message.token; } - }) - ); + this.connectionId = message.connectionId; + this.#emitter.emit('connect', this, this.identity, this.token); + break; + } + case 'SubscribeApplied': { + const event: Event = { tag: 'SubscribeApplied' }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const { event: _, ...subscriptionEventContext } = eventContext; + this.#applyTableUpdates([message.tableUpdate], eventContext); + this.#subscriptionManager.subscriptions + .get(message.queryId) + ?.emitter.emit('applied', subscriptionEventContext); + break; + } + case 'UnsubscribeApplied': { + const event: Event = { tag: 'UnsubscribeApplied' }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const { event: _, ...subscriptionEventContext } = eventContext; + this.#applyTableUpdates([message.tableUpdate], eventContext); + this.#subscriptionManager.subscriptions + .get(message.queryId) + ?.emitter.emit('end', subscriptionEventContext); + break; + } + case 'SubscriptionError': { + const error = Error(message.error); + const event: Event = { tag: 'Error', value: error }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const errorContext = { + ...eventContext, + event: error, + }; + if (message.queryId) { + this.#subscriptionManager.subscriptions + .get(message.queryId) + ?.emitter.emit('error', errorContext, error); + } else { + console.error('Received an error message without a queryId: ', error); + // Send it to all of them: + this.#subscriptionManager.subscriptions.forEach(({ emitter }) => { + emitter.emit('error', errorContext, error); + }); + } + } + } + } + + /** + * Handles WebSocket onMessage event. + * @param wsMessage MessageEvent object. + */ + #handleOnMessage(wsMessage: { data: Uint8Array }): void { + // Utilize promise chaining to ensure that we process messages in order + // even though we are processing them asyncronously. This will not begin + // processing the next message until we await the processing of the + // current message. + this.#messageQueue = this.#messageQueue.then(() => { + return this.#processMessage(wsMessage.data); + }); + } + + /** + * Call a reducer on your SpacetimeDB module. + * + * @param reducerName The name of the reducer to call + * @param argsSerializer The arguments to pass to the reducer + */ + callReducer( + reducerName: string, + argsBuffer: Uint8Array, + flags: CallReducerFlags + ): void { + const message = ws.ClientMessage.CallReducer({ + reducer: reducerName, + args: argsBuffer, + // The TypeScript SDK doesn't currently track `request_id`s, + // so always use 0. + requestId: 0, + flags: callReducerFlagsToNumber(flags), + }); + this.#sendMessage(message); } - on( + /** + * Close the current connection. + * + * @example + * + * ```ts + * const connection = DBConnection.builder().build(); + * connection.disconnect() + * ``` + */ + disconnect(): void { + this.wsPromise.then(wsResolved => { + wsResolved.close(); + }); + } + + #on( eventName: ConnectionEvent, - callback: (connection: DBConnectionImpl, ...args: any[]) => void + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.on(eventName, callback); } - off( + #off( eventName: ConnectionEvent, - callback: (connection: DBConnectionImpl, ...args: any[]) => void + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.off(eventName, callback); } - onConnect( - callback: (connection: DBConnectionImpl, ...args: any[]) => void - ): void { + #onConnect(callback: (ctx: DBConnectionImpl, ...args: any[]) => void): void { this.#emitter.on('connect', callback); } - onDisconnect( - callback: (connection: DBConnectionImpl, ...args: any[]) => void + #onDisconnect( + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.on('disconnect', callback); } - onConnectError( - callback: (connection: DBConnectionImpl, ...args: any[]) => void + #onConnectError( + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.on('connectError', callback); } - removeOnConnect( - callback: (connection: DBConnectionImpl, ...args: any[]) => void + #removeOnConnect( + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.off('connect', callback); } - removeOnDisconnect( - callback: (connection: DBConnectionImpl, ...args: any[]) => void + #removeOnDisconnect( + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.off('disconnect', callback); } - removeOnConnectError( - callback: (connection: DBConnectionImpl, ...args: any[]) => void + #removeOnConnectError( + callback: (ctx: DBConnectionImpl, ...args: any[]) => void ): void { this.#emitter.off('connectError', callback); } + // Note: This is required to be public because it needs to be + // called from the `RemoteReducers` class. onReducer(reducerName: string, callback: ReducerEventCallback): void { this.#reducerEmitter.on(reducerName, callback); } + // Note: This is required to be public because it needs to be + // called from the `RemoteReducers` class. offReducer(reducerName: string, callback: ReducerEventCallback): void { this.#reducerEmitter.off(reducerName, callback); } diff --git a/packages/sdk/src/db_context.ts b/packages/sdk/src/db_context.ts index e2410d0f..42700913 100644 --- a/packages/sdk/src/db_context.ts +++ b/packages/sdk/src/db_context.ts @@ -1,54 +1,4 @@ -type Result = - | { - tag: 'Ok'; - value: T; - } - | { - tag: 'Err'; - value: Error; - }; - -/** - * Interface representing a subscription handle for managing queries. - */ -interface SubscriptionHandle { - /** - * Consumes self and issues an `Unsubscribe` message, - * removing this query from the client's set of subscribed queries. - * It is only valid to call this method if `is_active()` is `true`. - */ - unsubscribe(): Result; - - /** - * Unsubscribes and also registers a callback to run upon success. - * I.e. when an `UnsubscribeApplied` message is received. - * - * If `Unsubscribe` returns an error, - * or if the `on_error` callback(s) are invoked before this subscription would end normally, - * the `on_end` callback is not invoked. - * - * @param onEnd - Callback to run upon successful unsubscribe. - */ - unsubscribeThen(onEnd: () => void): Result; - - /** - * True if this `SubscriptionHandle` has ended, - * either due to an error or a call to `unsubscribe`. - * - * This is initially false, and becomes true when either the `on_end` or `on_error` callback is invoked. - * A subscription which has not yet been applied is not active, but is also not ended. - */ - isEnded(): boolean; - - /** - * True if this `SubscriptionHandle` is active, meaning it has been successfully applied - * and has not since ended, either due to an error or a complete `unsubscribe` request-response pair. - * - * This corresponds exactly to the interval bounded at the start by the `on_applied` callback - * and at the end by either the `on_end` or `on_error` callback. - */ - isActive(): boolean; -} +import type { SubscriptionBuilderImpl } from './subscription_builder_impl'; /** * Interface representing a database context. @@ -67,6 +17,17 @@ export interface DBContext< setReducerFlags: SetReducerFlags; isActive: boolean; + /** + * Creates a new subscription builder. + * + * @returns The subscription builder. + */ + subscriptionBuilder(): SubscriptionBuilderImpl< + DBView, + Reducers, + SetReducerFlags + >; + /** * Disconnects from the database. */ diff --git a/packages/sdk/src/event_context.ts b/packages/sdk/src/event_context.ts index 2cf39cee..0c6f4a2d 100644 --- a/packages/sdk/src/event_context.ts +++ b/packages/sdk/src/event_context.ts @@ -36,5 +36,5 @@ export interface ErrorContextInterface< SetReducerFlags = any, > extends DBContext { /** Enum with variants for all possible events. */ - event: Error; + event?: Error; } diff --git a/packages/sdk/src/event_emitter.ts b/packages/sdk/src/event_emitter.ts index 3bf048cc..26c702a5 100644 --- a/packages/sdk/src/event_emitter.ts +++ b/packages/sdk/src/event_emitter.ts @@ -1,7 +1,7 @@ -export class EventEmitter { - #events: Map> = new Map(); +export class EventEmitter { + #events: Map> = new Map(); - on(event: string, callback: Callback): void { + on(event: Key, callback: Callback): void { let callbacks = this.#events.get(event); if (!callbacks) { callbacks = new Set(); @@ -10,7 +10,7 @@ export class EventEmitter { callbacks.add(callback); } - off(event: string, callback: Callback): void { + off(event: Key, callback: Callback): void { let callbacks = this.#events.get(event); if (!callbacks) { return; @@ -18,7 +18,7 @@ export class EventEmitter { callbacks.delete(callback); } - emit(event: string, ...args: any[]): void { + emit(event: Key, ...args: any[]): void { let callbacks = this.#events.get(event); if (!callbacks) { return; diff --git a/packages/sdk/src/message_types.ts b/packages/sdk/src/message_types.ts index d9b710dc..b6284369 100644 --- a/packages/sdk/src/message_types.ts +++ b/packages/sdk/src/message_types.ts @@ -37,8 +37,29 @@ export type IdentityTokenMessage = { connectionId: ConnectionId; }; +export type SubscribeAppliedMessage = { + tag: 'SubscribeApplied'; + queryId: number; + tableUpdate: TableUpdate; +}; + +export type UnsubscribeAppliedMessage = { + tag: 'UnsubscribeApplied'; + queryId: number; + tableUpdate: TableUpdate; +}; + +export type SubscriptionError = { + tag: 'SubscriptionError'; + queryId?: number; + error: string; +}; + export type Message = | InitialSubscriptionMessage | TransactionUpdateMessage | TransactionUpdateLightMessage - | IdentityTokenMessage; + | IdentityTokenMessage + | SubscribeAppliedMessage + | UnsubscribeAppliedMessage + | SubscriptionError; diff --git a/packages/sdk/src/spacetime_module.ts b/packages/sdk/src/spacetime_module.ts index 18882956..10f43795 100644 --- a/packages/sdk/src/spacetime_module.ts +++ b/packages/sdk/src/spacetime_module.ts @@ -12,7 +12,7 @@ export interface ReducerRuntimeTypeInfo { argsType: AlgebraicType; } -export default interface SpacetimeModule { +export default interface RemoteModule { tables: { [name: string]: TableRuntimeTypeInfo }; reducers: { [name: string]: ReducerRuntimeTypeInfo }; eventContextConstructor: (imp: DBConnectionImpl, event: any) => any; diff --git a/packages/sdk/src/subscription_builder_impl.ts b/packages/sdk/src/subscription_builder_impl.ts index 3b7171ee..1618f9d1 100644 --- a/packages/sdk/src/subscription_builder_impl.ts +++ b/packages/sdk/src/subscription_builder_impl.ts @@ -3,6 +3,7 @@ import type { ErrorContextInterface, SubscriptionEventContextInterface, } from './event_context'; +import { EventEmitter } from './event_emitter'; export class SubscriptionBuilderImpl< DBView = any, @@ -38,7 +39,7 @@ export class SubscriptionBuilderImpl< cb: ( ctx: SubscriptionEventContextInterface ) => void - ): SubscriptionBuilderImpl { + ): SubscriptionBuilderImpl { this.#onApplied = cb; return this; } @@ -65,26 +66,192 @@ export class SubscriptionBuilderImpl< */ onError( cb: (ctx: ErrorContextInterface) => void - ): SubscriptionBuilderImpl { + ): SubscriptionBuilderImpl { this.#onError = cb; return this; } /** - * Issues a new `Subscribe` message, - * adding `query` to the client's set of subscribed queries. + * Subscribe to a single query. The results of the query will be merged into the client + * cache and deduplicated on the client. * - * `query` should be a single SQL `SELECT` statement. + * @param query_sql A `SQL` query to subscribe to. * - * Installs the above callbacks into the new `SubscriptionHandle`, - * before issuing the `Subscribe` message, to avoid race conditions. + * @example * - * Consumes the `SubscriptionBuilder`, - * because the callbacks are not necessarily `Clone`. + * ```ts + * const subscription = connection.subscriptionBuilder().onApplied(() => { + * console.log("SDK client cache initialized."); + * }).subscribe("SELECT * FROM User"); * - * @param query_sql - The SQL query to subscribe to. + * subscription.unsubscribe(); + * ``` */ - subscribe(query_sql: string[]): void { - this.db['subscribe'](query_sql, this.#onApplied, this.#onError); + subscribe( + query_sql: string + ): SubscriptionHandleImpl { + return new SubscriptionHandleImpl( + this.db, + query_sql, + this.#onApplied, + this.#onError + ); + } + + /** + * Subscribes to all rows from all tables. + * + * This method is intended as a convenience + * for applications where client-side memory use and network bandwidth are not concerns. + * Applications where these resources are a constraint + * should register more precise queries via `subscribe` + * in order to replicate only the subset of data which the client needs to function. + * + * This method should not be combined with `subscribe` on the same `DbConnection`. + * A connection may either `subscribe` to particular queries, + * or `subscribeToAllTables`, but not both. + * Attempting to call `subscribe` + * on a `DbConnection` that has previously used `subscribeToAllTables`, + * or vice versa, may misbehave in any number of ways, + * including dropping subscriptions, corrupting the client cache, or throwing errors. + */ + subscribeToAllTables(): void { + this.subscribe('SELECT * FROM *'); + } +} + +export type SubscribeEvent = 'applied' | 'error' | 'end'; + +export class SubscriptionManager { + subscriptions: Map< + number, + { handle: SubscriptionHandleImpl; emitter: EventEmitter } + > = new Map(); +} + +export class SubscriptionHandleImpl< + DBView = any, + Reducers = any, + SetReducerFlags = any, +> { + #queryId: number; + #unsubscribeCalled: boolean = false; + #endedState: boolean = false; + #activeState: boolean = false; + #emitter: EventEmitter void> = + new EventEmitter(); + + constructor( + private db: DBConnectionImpl, + querySql: string, + onApplied?: ( + ctx: SubscriptionEventContextInterface + ) => void, + private onError?: ( + ctx: ErrorContextInterface, + error: Error + ) => void + ) { + this.#emitter.on( + 'applied', + ( + ctx: SubscriptionEventContextInterface< + DBView, + Reducers, + SetReducerFlags + > + ) => { + this.#activeState = true; + if (onApplied) { + onApplied(ctx); + } + } + ); + this.#emitter.on( + 'error', + ( + ctx: ErrorContextInterface, + error: Error + ) => { + if (this.onError) { + this.onError(ctx, error); + } + } + ); + this.#queryId = this.db.registerSubscription(this, this.#emitter, querySql); + } + + /** + * Consumes self and issues an `Unsubscribe` message, + * removing this query from the client's set of subscribed queries. + * It is only valid to call this method if `is_active()` is `true`. + */ + unsubscribe(): void { + if (this.#unsubscribeCalled) { + throw new Error('Unsubscribe has already been called'); + } + this.#unsubscribeCalled = true; + this.db.unregisterSubscription(this.#queryId); + this.db['unsubscribe'](this.#queryId); + } + + /** + * Unsubscribes and also registers a callback to run upon success. + * I.e. when an `UnsubscribeApplied` message is received. + * + * If `Unsubscribe` returns an error, + * or if the `on_error` callback(s) are invoked before this subscription would end normally, + * the `on_end` callback is not invoked. + * + * @param onEnd - Callback to run upon successful unsubscribe. + */ + unsubscribeThen( + onEnd: ( + ctx: SubscriptionEventContextInterface + ) => void + ): void { + if (this.#endedState) { + throw new Error('Subscription has already ended'); + } + if (this.#unsubscribeCalled) { + throw new Error('Unsubscribe has already been called'); + } + this.#unsubscribeCalled = true; + this.#emitter.on( + 'end', + ( + ctx: SubscriptionEventContextInterface< + DBView, + Reducers, + SetReducerFlags + > + ) => { + this.#endedState = true; + this.#activeState = false; + onEnd(ctx); + } + ); + } + + /** + * True if this `SubscriptionHandle` has ended, + * either due to an error or a call to `unsubscribe`. + * + * This is initially false, and becomes true when either the `on_end` or `on_error` callback is invoked. + * A subscription which has not yet been applied is not active, but is also not ended. + */ + isEnded(): boolean { + return this.#endedState; + } + + /** + * True if this `SubscriptionHandle` is active, meaning it has been successfully applied + * and has not since ended, either due to an error or a complete `unsubscribe` request-response pair. + * + * This corresponds exactly to the interval bounded at the start by the `on_applied` callback + * and at the end by either the `on_end` or `on_error` callback. + */ + isActive(): boolean { + return this.#activeState; } } diff --git a/packages/sdk/src/table_cache.ts b/packages/sdk/src/table_cache.ts index be3f394f..f70748a8 100644 --- a/packages/sdk/src/table_cache.ts +++ b/packages/sdk/src/table_cache.ts @@ -21,7 +21,7 @@ export type TableUpdate = { export class TableCache { private rows: Map; private tableTypeInfo: TableRuntimeTypeInfo; - private emitter: EventEmitter; + private emitter: EventEmitter<'insert' | 'delete' | 'update'>; /** * @param name the table name diff --git a/packages/sdk/src/websocket_decompress_adapter.ts b/packages/sdk/src/websocket_decompress_adapter.ts index d5ab7b08..6a556686 100644 --- a/packages/sdk/src/websocket_decompress_adapter.ts +++ b/packages/sdk/src/websocket_decompress_adapter.ts @@ -66,13 +66,13 @@ export class WebsocketDecompressAdapter { wsProtocol, authToken, compression, - light_mode, + lightMode, }: { url: URL; wsProtocol: string; authToken?: string; compression: 'gzip' | 'none'; - light_mode: boolean; + lightMode: boolean; }): Promise { const headers = new Headers(); if (authToken) { @@ -102,7 +102,7 @@ export class WebsocketDecompressAdapter { 'compression', compression === 'gzip' ? 'Gzip' : 'None' ); - if (light_mode) { + if (lightMode) { url.searchParams.set('light', 'true'); } } diff --git a/packages/sdk/tests/spacetimedb_client.test.ts b/packages/sdk/tests/db_connection.test.ts similarity index 85% rename from packages/sdk/tests/spacetimedb_client.test.ts rename to packages/sdk/tests/db_connection.test.ts index 3d97ed21..27321419 100644 --- a/packages/sdk/tests/spacetimedb_client.test.ts +++ b/packages/sdk/tests/db_connection.test.ts @@ -1,4 +1,10 @@ -import * as module_bindings from '@clockworklabs/test-app/src/module_bindings'; +import { + CreatePlayer, + DbConnection, + Player, + Point, + User, +} from '@clockworklabs/test-app/src/module_bindings'; import { beforeEach, describe, expect, test } from 'vitest'; import { ConnectionId } from '../src/connection_id'; import { Timestamp } from '../src/timestamp'; @@ -64,86 +70,42 @@ class Deferred { beforeEach(() => {}); -function encodePlayer(value: module_bindings.Player): Uint8Array { +function encodePlayer(value: Player): Uint8Array { const writer = new BinaryWriter(1024); - module_bindings.Player.serialize(writer, value); + Player.serialize(writer, value); return writer.getBuffer(); } -function encodeUser(value: module_bindings.User): Uint8Array { +function encodeUser(value: User): Uint8Array { const writer = new BinaryWriter(1024); - module_bindings.User.serialize(writer, value); + User.serialize(writer, value); return writer.getBuffer(); } -function encodeCreatePlayerArgs( - name: string, - location: module_bindings.Point -): Uint8Array { +function encodeCreatePlayerArgs(name: string, location: Point): Uint8Array { const writer = new BinaryWriter(1024); AlgebraicType.createStringType().serialize(writer, name); - module_bindings.Point.serialize(writer, location); + Point.serialize(writer, location); return writer.getBuffer(); } -describe('SpacetimeDBClient', () => { - test('auto subscribe on connect', async () => { - const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() - .withUri('ws://127.0.0.1:1234') - .withModuleName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) - .build(); - - client.subscriptionBuilder().subscribe(['SELECT * FROM Player']); - client - .subscriptionBuilder() - .subscribe(['SELECT * FROM Position', 'SELECT * FROM Coin']); - - let called = false; - client.onConnect(() => { - called = true; - }); - await client.wsPromise; - wsAdapter.acceptConnection(); - - const messages = wsAdapter.messageQueue; - expect(messages.length).toBe(2); - - const message: ws.ClientMessage = parseValue(ws.ClientMessage, messages[0]); - expect(message).toHaveProperty('tag', 'Subscribe'); - - const message2: ws.ClientMessage = parseValue( - ws.ClientMessage, - messages[1] - ); - expect(message2).toHaveProperty('tag', 'Subscribe'); - - const subscribeMessage = message.value as ws.Subscribe; - const expected = ['SELECT * FROM Player']; - expect(subscribeMessage.queryStrings).toEqual(expected); - - const subscribeMessage2 = message2.value as ws.Subscribe; - const expected2 = ['SELECT * FROM Position', 'SELECT * FROM Coin']; - expect(subscribeMessage2.queryStrings).toEqual(expected2); - }); - +describe('DBConnection', () => { test('call onConnect callback after getting an identity', async () => { const onConnectPromise = new Deferred(); const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + let called = false; + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) + .onConnect(() => { + called = true; + onConnectPromise.resolve(); + }) .build(); - let called = false; - client.onConnect(() => { - called = true; - onConnectPromise.resolve(); - }); - await client.wsPromise; + await client['wsPromise']; wsAdapter.acceptConnection(); const tokenMessage = ws.ServerMessage.IdentityToken({ @@ -160,17 +122,17 @@ describe('SpacetimeDBClient', () => { test('it calls onInsert callback when a record is added with a subscription update and then with a transaction update', async () => { const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + let called = false; + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) + .onConnect(() => { + called = true; + }) .build(); - let called = false; - client.onConnect(() => { - called = true; - }); - await client.wsPromise; + await client['wsPromise']; wsAdapter.acceptConnection(); const tokenMessage = ws.ServerMessage.IdentityToken({ @@ -184,10 +146,10 @@ describe('SpacetimeDBClient', () => { reducerEvent: | ReducerEvent<{ name: 'CreatePlayer'; - args: module_bindings.CreatePlayer; + args: CreatePlayer; }> | undefined; - player: module_bindings.Player; + player: Player; }[] = []; const insert1Promise = new Deferred(); @@ -210,21 +172,17 @@ describe('SpacetimeDBClient', () => { let reducerCallbackLog: { reducerEvent: ReducerEvent<{ name: 'CreatePlayer'; - args: module_bindings.CreatePlayer; + args: CreatePlayer; }>; reducerArgs: any[]; }[] = []; - client.reducers.onCreatePlayer( - (ctx, name: string, location: module_bindings.Point) => { - if (ctx.event.tag === 'Reducer') { - const reducerEvent = ctx.event.value; - reducerCallbackLog.push({ - reducerEvent, - reducerArgs: [name, location], - }); - } - } - ); + client.reducers.onCreatePlayer((ctx, name: string, location: Point) => { + const reducerEvent = ctx.event; + reducerCallbackLog.push({ + reducerEvent, + reducerArgs: [name, location], + }); + }); const subscriptionMessage: ws.ServerMessage = ws.ServerMessage.InitialSubscription({ @@ -326,17 +284,17 @@ describe('SpacetimeDBClient', () => { test('it calls onUpdate callback when a record is added with a subscription update and then with a transaction update', async () => { const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + let called = false; + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) + .onConnect(() => { + called = true; + }) .build(); - let called = false; - client.onConnect(() => { - called = true; - }); - await client.wsPromise; + await client['wsPromise']; wsAdapter.acceptConnection(); const tokenMessage = ws.ServerMessage.IdentityToken({ @@ -350,8 +308,8 @@ describe('SpacetimeDBClient', () => { const update2Promise = new Deferred(); const updates: { - oldPlayer: module_bindings.Player; - newPlayer: module_bindings.Player; + oldPlayer: Player; + newPlayer: Player; }[] = []; client.db.player.onUpdate((_ctx, oldPlayer, newPlayer) => { updates.push({ @@ -465,17 +423,17 @@ describe('SpacetimeDBClient', () => { test('a reducer callback should be called after the database callbacks', async () => { const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + let called = false; + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) + .onConnect(() => { + called = true; + }) .build(); - let called = false; - client.onConnect(() => { - called = true; - }); - await client.wsPromise; + await client['wsPromise']; wsAdapter.acceptConnection(); let callbackLog: string[] = []; @@ -545,17 +503,17 @@ describe('SpacetimeDBClient', () => { test('it calls onUpdate callback when a record is added with a subscription update and then with a transaction update when the PK is of type Identity', async () => { const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + let called = false; + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) + .onConnect(() => { + called = true; + }) .build(); - let called = false; - client.onConnect(() => { - called = true; - }); - await client.wsPromise; + await client['wsPromise']; wsAdapter.acceptConnection(); const tokenMessage = ws.ServerMessage.IdentityToken({ @@ -571,8 +529,8 @@ describe('SpacetimeDBClient', () => { const update2Promise = new Deferred(); const updates: { - oldUser: module_bindings.User; - newUser: module_bindings.User; + oldUser: User; + newUser: User; }[] = []; client.db.user.onUpdate((_ctx, oldUser, newUser) => { updates.push({ @@ -698,20 +656,19 @@ describe('SpacetimeDBClient', () => { test('Filtering works', async () => { const wsAdapter = new WebsocketTestAdapter(); - const client = module_bindings.DBConnection.builder() + const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withModuleName('db') .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter)) .build(); - await client.wsPromise; + await client['wsPromise']; const db = client.db; const user1 = { identity: bobIdentity, username: 'bob' }; const user2 = { identity: sallyIdentity, username: 'sally', }; - const users: Map = (db.user.tableCache as any) - .rows; + const users: Map = (db.user.tableCache as any).rows; users.set('abc123', user1); users.set('def456', user2); diff --git a/packages/test-app/src/App.tsx b/packages/test-app/src/App.tsx index 95055554..e9c2e95c 100644 --- a/packages/test-app/src/App.tsx +++ b/packages/test-app/src/App.tsx @@ -20,7 +20,7 @@ function App() { identity.toHexString() ); - conn.subscriptionBuilder().subscribe(['SELECT * FROM player']); + conn.subscriptionBuilder().subscribe('SELECT * FROM player'); }) .withToken( 'eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiIwMUpCQTBYRzRESFpIWUdQQk5GRFk5RDQ2SiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zdGFnaW5nLnNwYWNldGltZWRiLmNvbSIsImlhdCI6MTczMDgwODUwNSwiZXhwIjoxNzkzODgwNTA1fQ.kGM4HGX0c0twL8NJoSQowzSZa8dc2Ogc-fsvaDK7otUrcdGFsZ3KsNON2eNkFh73FER0hl55_eJStr2tgoPwfTyl_v_TqkY45iUOUlLmHfB-X42cMzpE7PXbR_PKYcp-P-Wa4jGtVl4oF7CvdGKxlhIYEk3e0ElQlA9ThnZN4IEciYV0vwAXGqbaO9SOG8jbrmlmfN7oKgl02EgpodEAHTrnB2mD1qf1YyOw7_9n_EkxJxWLkJf9-nFCVRrbfSLqSJBeE6OKNAu2VLLYrSFE7GkVXNCFVugoCDM2oVJogX75AgzWimrp75QRmLsXbvB-YvvRkQ8Gfb2RZnqCj9kiYg' diff --git a/packages/test-app/src/module_bindings/create_player_reducer.ts b/packages/test-app/src/module_bindings/create_player_reducer.ts index 7636245d..89f63e8e 100644 --- a/packages/test-app/src/module_bindings/create_player_reducer.ts +++ b/packages/test-app/src/module_bindings/create_player_reducer.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; diff --git a/packages/test-app/src/module_bindings/index.ts b/packages/test-app/src/module_bindings/index.ts index 6cc09b54..305b7e9c 100644 --- a/packages/test-app/src/module_bindings/index.ts +++ b/packages/test-app/src/module_bindings/index.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; @@ -165,11 +167,16 @@ export class DBConnection extends DBConnectionImpl< RemoteReducers, SetReducerFlags > { - static builder = (): DBConnectionBuilder => { - return new DBConnectionBuilder( - REMOTE_MODULE, - (imp: DBConnectionImpl) => imp as DBConnection - ); + static builder = (): DBConnectionBuilder< + DBConnection, + ErrorContext, + SubscriptionEventContext + > => { + return new DBConnectionBuilder< + DBConnection, + ErrorContext, + SubscriptionEventContext + >(REMOTE_MODULE, (imp: DBConnectionImpl) => imp as DBConnection); }; subscriptionBuilder = (): SubscriptionBuilder => { return new SubscriptionBuilder(this); diff --git a/packages/test-app/src/module_bindings/player_table.ts b/packages/test-app/src/module_bindings/player_table.ts index ebfe85bb..c3732281 100644 --- a/packages/test-app/src/module_bindings/player_table.ts +++ b/packages/test-app/src/module_bindings/player_table.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; import { Player } from './player_type'; diff --git a/packages/test-app/src/module_bindings/player_type.ts b/packages/test-app/src/module_bindings/player_type.ts index 6fa51273..ced5d3f6 100644 --- a/packages/test-app/src/module_bindings/player_type.ts +++ b/packages/test-app/src/module_bindings/player_type.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; import { Point as __Point } from './point_type'; diff --git a/packages/test-app/src/module_bindings/point_type.ts b/packages/test-app/src/module_bindings/point_type.ts index d2454ec8..6e0ae7bb 100644 --- a/packages/test-app/src/module_bindings/point_type.ts +++ b/packages/test-app/src/module_bindings/point_type.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; export type Point = { diff --git a/packages/test-app/src/module_bindings/user_table.ts b/packages/test-app/src/module_bindings/user_table.ts index 0634ed0e..62c38851 100644 --- a/packages/test-app/src/module_bindings/user_table.ts +++ b/packages/test-app/src/module_bindings/user_table.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; import { User } from './user_type'; diff --git a/packages/test-app/src/module_bindings/user_type.ts b/packages/test-app/src/module_bindings/user_type.ts index 2e60bbbf..ba519bd4 100644 --- a/packages/test-app/src/module_bindings/user_type.ts +++ b/packages/test-app/src/module_bindings/user_type.ts @@ -26,6 +26,8 @@ import { SumType, SumTypeVariant, TableCache, + TimeDuration, + Timestamp, deepEqual, } from '@clockworklabs/spacetimedb-sdk'; export type User = {