Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/pages/test.vue
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ if (import.meta.client) {
console.log('📩 Event:', payload)
},
})
useWebSocket(`${location.protocol === 'https:' ? 'wss:' : 'ws:'}//${location.host}/wsevent`, {
onConnected: (_ws) => {
console.log('🟢 EVENT: WebSocket connected')
},
onMessage: (_ws, event) => {
const payload = JSON.parse(event.data)
console.log('📩 (EVENT) Event:', payload)
},
})
}
</script>

Expand Down
1 change: 1 addition & 0 deletions bio-durable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export default {
const id = env.BioDurable.idFromName(channel)
const stub = env.BioDurable.get(id)

console.log('[Worker] Forwarding WS request to Durable Object:', channel)
const res = await stub.fetch(request)
return res
}
Expand Down
2 changes: 1 addition & 1 deletion server/routes/ws.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const connections = new WeakMap<any, any>()

export default defineWebSocketHandler({
export default defineWebsocketCustomHandler({
async upgrade(request) {
await requireUserSession(request)
},
Expand Down
77 changes: 77 additions & 0 deletions server/routes/wsevent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const connections = new WeakMap<any, any>()

export default defineWebSocketEventHandler((event) => {

Check warning on line 3 in server/routes/wsevent.ts

View workflow job for this annotation

GitHub Actions / lint

'event' is defined but never used. Allowed unused args must match /^_/u.
console.log('[WS Bridge] New connection attempt')
return {
async upgrade(request) {
await requireUserSession(request)
},

async open(peer) {
const { user } = await requireUserSession(peer)
const channel = `user-${user.id}`
console.log(`[WS Bridge] Authenticated ${user.email}. channel: ${channel}`)

const token = useRuntimeConfig().session.password

let upstream: any

if (import.meta.dev) {
console.log('[WS Bridge] Local: Connecting to 8787')
const WS = await import('ws').then(r => r.default)
upstream = new WS(`ws://localhost:8787/?channel=${channel}&token=${token}`)

upstream.on('open', () => console.log('[WS Bridge] Upstream connected'))
upstream.on('message', (data: any) => peer.send(data.toString()))
upstream.on('error', (err: any) => {
console.error('[WS Bridge] Upstream error:', err)
peer.close()
})
upstream.on('close', () => peer.close())
}
else {
const response = await fetch(`https://bio-durable.acidjazz.workers.dev/?channel=${channel}&token=${token}`, {
headers: {
Upgrade: 'websocket',
Connection: 'Upgrade',
},
})

if (response.status !== 101) {
throw new Error(`Upstream responded with ${response.status}`)
}

upstream = response.webSocket
if (!upstream) throw new Error('No WebSocket in response')

upstream.accept()

upstream.addEventListener('message', (msg: any) => {
peer.send(typeof msg.data === 'string' ? msg.data : msg.data.toString())
})
upstream.addEventListener('close', () => peer.close())
upstream.addEventListener('error', (err: any) => {
console.error('[WS Bridge] Upstream error:', err)
peer.close()
})
}

connections.set(peer, upstream)
},

message(peer, message) {
const upstream = connections.get(peer)
if (upstream?.readyState === 1) {
upstream.send(message.text())
}
},

close(peer) {
const upstream = connections.get(peer)
if (upstream) {
upstream.close()
connections.delete(peer)
}
},
}
})
29 changes: 29 additions & 0 deletions server/utils/websocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { Hooks as WSHooks } from 'crossws'

import { defineEventHandler } from 'h3'

/**
* Define WebSocket hooks.
*
* @see https://h3.unjs.io/guide/websocket
*/
export function defineWebsocketCustom(hooks: Partial<WSHooks>): Partial<WSHooks> {
return hooks
}

/**
* Define WebSocket event handler.
*
* @see https://h3.unjs.io/guide/websocket
*/
export function defineWebsocketCustomHandler(hooks: Partial<WSHooks>) {
return defineEventHandler({
handler() {
throw createError({
statusCode: 426,
statusMessage: 'Upgrade Required',
})
},
websocket: hooks,
})
}
48 changes: 48 additions & 0 deletions server/utils/websocketEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import type { H3Event, EventHandler } from 'h3'
import type { Hooks as WebSocketHooks } from 'crossws'

import { defineEventHandler } from 'h3'

export type {
Hooks as WebSocketHooks,
Message as WebSocketMessage,
Peer as WebSocketPeer,
} from 'crossws'

/**
* Define WebSocket hooks.
*
* @see https://h3.unjs.io/guide/websocket
*/
export function defineWebSocketEvent(
hooks: Partial<WebSocketHooks>,
): Partial<WebSocketHooks> {
return hooks
}

/**
* Define WebSocket event handler.
*
* @see https://h3.unjs.io/guide/websocket
*/

export function defineWebSocketEventHandler(
hooks:
| Partial<WebSocketHooks>
| ((
event: H3Event,
) => Partial<WebSocketHooks> | Promise<Partial<WebSocketHooks>>),
): EventHandler {
return defineEventHandler(function _webSocketHandler(event) {
const crossws = typeof hooks === 'function' ? hooks(event) : hooks

return Object.assign(
new Response('WebSocket upgrade is required.', {
status: 426,
}),
{
crossws,
},
)
})
}
Loading