Skip to content

Commit eef891a

Browse files
committed
Expose and validate connection.tags property
Add a read-only connection.tags property and tag handling across the partyserver Durable Object. Introduces prepareTags() to dedupe and validate tags (always prepends the connection id, max 10 tags, non-empty strings, <=256 chars). Persist tags in the connection attachment for hibernating connections (default to [] for older attachments) and expose tags on in-memory connections via a property. Update createLazyConnection, connection attachment parsing, and both connection managers to use the helper. Includes type updates, new tests covering hibernating and in-memory tag behavior, and wrangler test config additions.
1 parent 2c0f106 commit eef891a

7 files changed

Lines changed: 230 additions & 34 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"partyserver": minor
3+
---
4+
5+
Add `connection.tags` property to read back tags assigned via `getConnectionTags()`. Works in both hibernating and in-memory modes. Tags are validated and always include the connection id as the first tag.

packages/partyserver/src/connection.ts

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type ConnectionAttachments = {
3434
// TODO: remove this once we have
3535
// durable object level setState
3636
server: string;
37+
tags: string[];
3738
};
3839
__user?: unknown;
3940
};
@@ -57,11 +58,20 @@ function tryGetPartyServerMeta(
5758
if (!pk || typeof pk !== "object") {
5859
return null;
5960
}
60-
const { id, server } = pk as { id?: unknown; server?: unknown };
61+
const { id, server, tags } = pk as {
62+
id?: unknown;
63+
server?: unknown;
64+
tags?: unknown;
65+
};
6166
if (typeof id !== "string" || typeof server !== "string") {
6267
return null;
6368
}
64-
return pk as ConnectionAttachments["__pk"];
69+
// Default tags to [] for connections created before tags were stored
70+
return {
71+
id,
72+
server,
73+
tags: Array.isArray(tags) ? tags : []
74+
} as ConnectionAttachments["__pk"];
6575
} catch {
6676
return null;
6777
}
@@ -138,6 +148,12 @@ export const createLazyConnection = (
138148
return attachments.get(ws).__pk.server;
139149
}
140150
},
151+
tags: {
152+
get() {
153+
// Default to [] for connections accepted before tags were stored
154+
return attachments.get(ws).__pk.tags ?? [];
155+
}
156+
},
141157
socket: {
142158
get() {
143159
return ws;
@@ -233,6 +249,36 @@ class HibernatingConnectionIterator<T> implements IterableIterator<
233249
}
234250
}
235251

252+
/**
253+
* Deduplicate and validate connection tags.
254+
* Returns the final tag array (always includes the connection id as the first tag).
255+
*/
256+
function prepareTags(connectionId: string, userTags: string[]): string[] {
257+
const tags = [connectionId, ...userTags.filter((t) => t !== connectionId)];
258+
259+
// validate tags against documented restrictions
260+
// https://developers.cloudflare.com/durable-objects/api/hibernatable-websockets-api/#state-methods-for-websockets
261+
if (tags.length > 10) {
262+
throw new Error(
263+
"A connection can only have 10 tags, including the default id tag."
264+
);
265+
}
266+
267+
for (const tag of tags) {
268+
if (typeof tag !== "string") {
269+
throw new Error(`A connection tag must be a string. Received: ${tag}`);
270+
}
271+
if (tag === "") {
272+
throw new Error("A connection tag must not be an empty string.");
273+
}
274+
if (tag.length > 256) {
275+
throw new Error("A connection tag must not exceed 256 characters");
276+
}
277+
}
278+
279+
return tags;
280+
}
281+
236282
export interface ConnectionManager {
237283
getCount(): number;
238284
getConnection<TState>(id: string): Connection<TState> | undefined;
@@ -280,12 +326,16 @@ export class InMemoryConnectionManager<TState> implements ConnectionManager {
280326
accept(connection: Connection, options: { tags: string[]; server: string }) {
281327
connection.accept();
282328

329+
const tags = prepareTags(connection.id, options.tags);
330+
283331
this.#connections.set(connection.id, connection);
284-
this.tags.set(connection, [
285-
// make sure we have id tag
286-
connection.id,
287-
...options.tags.filter((t) => t !== connection.id)
288-
]);
332+
this.tags.set(connection, tags);
333+
334+
// Expose tags on the connection object itself
335+
Object.defineProperty(connection, "tags", {
336+
get: () => tags,
337+
configurable: true
338+
});
289339

290340
const removeConnection = () => {
291341
this.#connections.delete(connection.id);
@@ -336,37 +386,14 @@ export class HibernatingConnectionManager<TState> implements ConnectionManager {
336386
}
337387

338388
accept(connection: Connection, options: { tags: string[]; server: string }) {
339-
// dedupe tags in case user already provided id tag
340-
const tags = [
341-
connection.id,
342-
...options.tags.filter((t) => t !== connection.id)
343-
];
344-
345-
// validate tags against documented restrictions
346-
// shttps://developers.cloudflare.com/durable-objects/api/hibernatable-websockets-api/#state-methods-for-websockets
347-
if (tags.length > 10) {
348-
throw new Error(
349-
"A connection can only have 10 tags, including the default id tag."
350-
);
351-
}
352-
353-
for (const tag of tags) {
354-
if (typeof tag !== "string") {
355-
throw new Error(`A connection tag must be a string. Received: ${tag}`);
356-
}
357-
if (tag === "") {
358-
throw new Error("A connection tag must not be an empty string.");
359-
}
360-
if (tag.length > 256) {
361-
throw new Error("A connection tag must not exceed 256 characters");
362-
}
363-
}
389+
const tags = prepareTags(connection.id, options.tags);
364390

365391
this.controller.acceptWebSocket(connection, tags);
366392
connection.serializeAttachment({
367393
__pk: {
368394
id: connection.id,
369-
server: options.server
395+
server: options.server,
396+
tags
370397
},
371398
__user: null
372399
});

packages/partyserver/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ Did you try connecting directly to this Durable Object? Try using getServerByNam
431431
let connection: Connection = Object.assign(serverWebSocket, {
432432
id: connectionId,
433433
server: this.name,
434+
tags: [] as string[],
434435
state: null as unknown as ConnectionState<unknown>,
435436
setState<T = unknown>(setState: T | ConnectionSetStateFn<T>) {
436437
let state: T;

packages/partyserver/src/tests/index.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,3 +658,105 @@ describe("CORS", () => {
658658
);
659659
});
660660
});
661+
662+
describe("Connection tags", () => {
663+
it("exposes tags on a hibernating connection", async () => {
664+
const ctx = createExecutionContext();
665+
const request = new Request(
666+
"http://example.com/parties/tags-server/room1",
667+
{
668+
headers: { Upgrade: "websocket" }
669+
}
670+
);
671+
const response = await worker.fetch(request, env, ctx);
672+
const ws = response.webSocket!;
673+
ws.accept();
674+
675+
const { promise, resolve, reject } = Promise.withResolvers<void>();
676+
ws.addEventListener("message", (message) => {
677+
try {
678+
const tags = JSON.parse(message.data as string) as string[];
679+
// Should include the auto-prepended connection id plus the custom tags
680+
expect(tags).toHaveLength(3);
681+
expect(tags[0]).toBeTypeOf("string"); // connection id
682+
expect(tags).toContain("role:admin");
683+
expect(tags).toContain("room:lobby");
684+
resolve();
685+
} catch (e) {
686+
reject(e);
687+
} finally {
688+
ws.close();
689+
}
690+
});
691+
692+
return promise;
693+
});
694+
695+
it("exposes tags on a hibernating connection after wake-up", async () => {
696+
const ctx = createExecutionContext();
697+
const request = new Request(
698+
"http://example.com/parties/tags-server/room2",
699+
{
700+
headers: { Upgrade: "websocket" }
701+
}
702+
);
703+
const response = await worker.fetch(request, env, ctx);
704+
const ws = response.webSocket!;
705+
ws.accept();
706+
707+
// Wait for the onConnect message
708+
const connectMessage = await new Promise<string>((resolve) => {
709+
ws.addEventListener("message", (e) => resolve(e.data as string), {
710+
once: true
711+
});
712+
});
713+
const connectTags = JSON.parse(connectMessage) as string[];
714+
expect(connectTags).toContain("role:admin");
715+
716+
// Send a message to trigger onMessage, which reads tags again
717+
ws.send("ping");
718+
const wakeMessage = await new Promise<string>((resolve) => {
719+
ws.addEventListener("message", (e) => resolve(e.data as string), {
720+
once: true
721+
});
722+
});
723+
const wakeTags = JSON.parse(wakeMessage) as string[];
724+
expect(wakeTags).toHaveLength(3);
725+
expect(wakeTags).toContain("role:admin");
726+
expect(wakeTags).toContain("room:lobby");
727+
728+
ws.close();
729+
});
730+
731+
it("exposes tags on a non-hibernating (in-memory) connection", async () => {
732+
const ctx = createExecutionContext();
733+
const request = new Request(
734+
"http://example.com/parties/tags-server-in-memory/room1",
735+
{
736+
headers: { Upgrade: "websocket" }
737+
}
738+
);
739+
const response = await worker.fetch(request, env, ctx);
740+
const ws = response.webSocket!;
741+
ws.accept();
742+
743+
const { promise, resolve, reject } = Promise.withResolvers<void>();
744+
ws.addEventListener("message", (message) => {
745+
try {
746+
const tags = JSON.parse(message.data as string) as string[];
747+
// Should include the auto-prepended connection id plus the custom tags
748+
expect(tags).toHaveLength(3);
749+
expect(tags[0]).toBeTypeOf("string"); // connection id
750+
expect(tags).toContain("role:viewer");
751+
expect(tags).toContain("room:general");
752+
resolve();
753+
} catch (e) {
754+
reject(e);
755+
} finally {
756+
ws.close();
757+
}
758+
});
759+
760+
return promise;
761+
});
762+
});

packages/partyserver/src/tests/worker.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ export type Env = {
2121
CustomCorsServer: DurableObjectNamespace<CustomCorsServer>;
2222
FailingOnStartServer: DurableObjectNamespace<FailingOnStartServer>;
2323
HibernatingNameInMessage: DurableObjectNamespace<HibernatingNameInMessage>;
24+
TagsServer: DurableObjectNamespace<TagsServer>;
25+
TagsServerInMemory: DurableObjectNamespace<TagsServerInMemory>;
2426
};
2527

2628
export class Stateful extends Server {
@@ -307,6 +309,49 @@ export class HibernatingNameInMessage extends Server {
307309
}
308310
}
309311

312+
/**
313+
* Tests that connection.tags is readable in hibernating mode.
314+
*/
315+
export class TagsServer extends Server {
316+
static options = {
317+
hibernate: true
318+
};
319+
320+
getConnectionTags(
321+
_connection: Connection,
322+
_ctx: ConnectionContext
323+
): string[] {
324+
return ["role:admin", "room:lobby"];
325+
}
326+
327+
onConnect(connection: Connection): void {
328+
connection.send(JSON.stringify(connection.tags));
329+
}
330+
331+
onMessage(connection: Connection, _message: WSMessage): void {
332+
// Also verify tags survive hibernation wake-up
333+
connection.send(JSON.stringify(connection.tags));
334+
}
335+
}
336+
337+
/**
338+
* Tests that connection.tags is readable in non-hibernating (in-memory) mode.
339+
*/
340+
export class TagsServerInMemory extends Server {
341+
// no hibernate — uses the in-memory path
342+
343+
getConnectionTags(
344+
_connection: Connection,
345+
_ctx: ConnectionContext
346+
): string[] {
347+
return ["role:viewer", "room:general"];
348+
}
349+
350+
onConnect(connection: Connection): void {
351+
connection.send(JSON.stringify(connection.tags));
352+
}
353+
}
354+
310355
export class CorsServer extends Server {
311356
onRequest(): Response | Promise<Response> {
312357
return Response.json({ cors: true });

packages/partyserver/src/tests/wrangler.jsonc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@
5858
{
5959
"name": "HibernatingNameInMessage",
6060
"class_name": "HibernatingNameInMessage"
61+
},
62+
{
63+
"name": "TagsServer",
64+
"class_name": "TagsServer"
65+
},
66+
{
67+
"name": "TagsServerInMemory",
68+
"class_name": "TagsServerInMemory"
6169
}
6270
]
6371
},
@@ -76,7 +84,9 @@
7684
"HibernatingOnStartServer",
7785
"AlarmServer",
7886
"FailingOnStartServer",
79-
"HibernatingNameInMessage"
87+
"HibernatingNameInMessage",
88+
"TagsServer",
89+
"TagsServerInMemory"
8090
]
8191
}
8292
]

packages/partyserver/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ export type Connection<TState = unknown> = WebSocket & {
7070
*/
7171
deserializeAttachment<T = unknown>(): T | null;
7272

73+
/**
74+
* Tags assigned to this connection via {@link Server.getConnectionTags}.
75+
* Always includes the connection id as the first tag.
76+
*/
77+
tags: readonly string[];
78+
7379
/**
7480
* Server's name
7581
*/

0 commit comments

Comments
 (0)