@@ -15,8 +15,28 @@ import (
15
15
16
16
// chatServer enables broadcasting to a set of subscribers.
17
17
type chatServer struct {
18
+ registerOnce sync.Once
19
+ m http.ServeMux
20
+
18
21
subscribersMu sync.RWMutex
19
- subscribers map [chan <- []byte ]struct {}
22
+ subscribers map [* subscriber ]struct {}
23
+ }
24
+
25
+ // subscriber represents a subscriber.
26
+ // Messages are sent on the msgs channel and if the client
27
+ // cannot keep up with the messages, closeSlow is called.
28
+ type subscriber struct {
29
+ msgs chan []byte
30
+ closeSlow func ()
31
+ }
32
+
33
+ func (cs * chatServer ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
34
+ cs .registerOnce .Do (func () {
35
+ cs .m .Handle ("/" , http .FileServer (http .Dir ("." )))
36
+ cs .m .HandleFunc ("/subscribe" , cs .subscribeHandler )
37
+ cs .m .HandleFunc ("/publish" , cs .publishHandler )
38
+ })
39
+ cs .m .ServeHTTP (w , r )
20
40
}
21
41
22
42
// subscribeHandler accepts the WebSocket connection and then subscribes
@@ -57,11 +77,13 @@ func (cs *chatServer) publishHandler(w http.ResponseWriter, r *http.Request) {
57
77
}
58
78
59
79
cs .publish (msg )
80
+
81
+ w .WriteHeader (http .StatusAccepted )
60
82
}
61
83
62
84
// subscribe subscribes the given WebSocket to all broadcast messages.
63
- // It creates a msgs chan with a buffer of 16 to give some room to slower
64
- // connections and then registers it . It then listens for all messages
85
+ // It creates a subscriber with a buffered msgs chan to give some room to slower
86
+ // connections and then registers the subscriber . It then listens for all messages
65
87
// and writes them to the WebSocket. If the context is cancelled or
66
88
// an error occurs, it returns and deletes the subscription.
67
89
//
@@ -70,13 +92,18 @@ func (cs *chatServer) publishHandler(w http.ResponseWriter, r *http.Request) {
70
92
func (cs * chatServer ) subscribe (ctx context.Context , c * websocket.Conn ) error {
71
93
ctx = c .CloseRead (ctx )
72
94
73
- msgs := make (chan []byte , 16 )
74
- cs .addSubscriber (msgs )
75
- defer cs .deleteSubscriber (msgs )
95
+ s := & subscriber {
96
+ msgs : make (chan []byte , 16 ),
97
+ closeSlow : func () {
98
+ c .Close (websocket .StatusPolicyViolation , "connection too slow to keep up with messages" )
99
+ },
100
+ }
101
+ cs .addSubscriber (s )
102
+ defer cs .deleteSubscriber (s )
76
103
77
104
for {
78
105
select {
79
- case msg := <- msgs :
106
+ case msg := <- s . msgs :
80
107
err := writeTimeout (ctx , time .Second * 5 , c , msg )
81
108
if err != nil {
82
109
return err
@@ -94,29 +121,29 @@ func (cs *chatServer) publish(msg []byte) {
94
121
cs .subscribersMu .RLock ()
95
122
defer cs .subscribersMu .RUnlock ()
96
123
97
- for c := range cs .subscribers {
124
+ for s := range cs .subscribers {
98
125
select {
99
- case c <- msg :
126
+ case s . msgs <- msg :
100
127
default :
128
+ go s .closeSlow ()
101
129
}
102
130
}
103
131
}
104
132
105
- // addSubscriber registers a subscriber with a channel
106
- // on which to send messages.
107
- func (cs * chatServer ) addSubscriber (msgs chan <- []byte ) {
133
+ // addSubscriber registers a subscriber.
134
+ func (cs * chatServer ) addSubscriber (s * subscriber ) {
108
135
cs .subscribersMu .Lock ()
109
136
if cs .subscribers == nil {
110
- cs .subscribers = make (map [chan <- [] byte ]struct {})
137
+ cs .subscribers = make (map [* subscriber ]struct {})
111
138
}
112
- cs .subscribers [msgs ] = struct {}{}
139
+ cs .subscribers [s ] = struct {}{}
113
140
cs .subscribersMu .Unlock ()
114
141
}
115
142
116
- // deleteSubscriber deletes the subscriber with the given msgs channel .
117
- func (cs * chatServer ) deleteSubscriber (msgs chan [] byte ) {
143
+ // deleteSubscriber deletes the given subscriber .
144
+ func (cs * chatServer ) deleteSubscriber (s * subscriber ) {
118
145
cs .subscribersMu .Lock ()
119
- delete (cs .subscribers , msgs )
146
+ delete (cs .subscribers , s )
120
147
cs .subscribersMu .Unlock ()
121
148
}
122
149
0 commit comments