123123 the consumer sets its cell to a request with a dummy callback that rejects
124124 all values and continues immediately.
125125
126+ Close
127+
128+ The LSB of the balance atomic is used to indicate that the stream has been closed.
129+ When closed, the balance is always zero and no new consumers or producers can be added.
130+ The closing thread is responsible for cancelling all pre-existing users.
131+
126132 The exchange
127133
128134 Once a producer and consumer have been paired off (and so their cell is now Finished),
@@ -137,9 +143,14 @@ module Fiber_context = Eio__core.Private.Fiber_context
137143module Suspend = Eio__core.Private. Suspend
138144module Cancel = Eio__core. Cancel
139145
146+ type producer_result =
147+ | Sent (* Consumer accepted item. *)
148+ | Rejected (* Consumer rejected the item. Retry. *)
149+ | Failed of exn (* Cancelled or closed. *)
150+
140151type 'a item = {
141- v : 'a ;
142- kp : ( bool , exn ) result -> unit ; (* [Ok false] means consumer refused the item; retry. *)
152+ v : ( 'a , [ `Closed ]) result ;
153+ kp : producer_result -> unit ;
143154 cancel : [
144155 | `Resuming (* In the process of resuming, so can't cancel. *)
145156 | `Suspended of (unit -> bool ) (* Call this function to attempt to leave the queue. *)
@@ -149,7 +160,7 @@ type 'a item = {
149160
150161type 'a cell =
151162 | In_transition
152- | Slot of ('a -> bool )
163+ | Slot of (( 'a , [ `Closed ]) result -> bool )
153164 | Item of 'a item
154165 | Finished
155166
169180
170181module Q = Cells. Make (Cell )
171182
183+ type update_result =
184+ | Updated
185+ | Update_refused
186+ | Balance_closed
187+
188+ module Balance : sig
189+ type t
190+
191+ val make : unit -> t
192+
193+ val close : t -> (int , [> `Closed ]) result
194+ (* Mark as closed and return the previous state. *)
195+
196+ val get : t -> (int , [> `Closed ]) result
197+ (* * [get t] is the number of items available (if non-negative) or the
198+ number of consumers waiting for an item. *)
199+
200+ val fetch_and_add : t -> int -> (int , [> `Closed ]) result
201+ (* * [fetch_and_add t diff] increases the value by [diff] and returns the old value. *)
202+
203+ val incr_if_negative : t -> update_result
204+ val decr_if_positive : t -> update_result
205+
206+ val pp : t Fmt .t
207+ end = struct
208+ type t = int Atomic .t
209+
210+ let closed = 1
211+ let counter x = x asr 1
212+ let is_closed x = (x land 1 ) <> 0
213+
214+ let value x =
215+ if is_closed x then Error `Closed else Ok (counter x)
216+
217+ let fetch_and_add x diff =
218+ value (Atomic. fetch_and_add x (diff lsl 1 ))
219+
220+ let rec decr_if_positive t =
221+ let x = Atomic. get t in
222+ if is_closed x then Balance_closed
223+ else if counter x > 0 then (
224+ if Atomic. compare_and_set t x (x - 2 ) then Updated
225+ else decr_if_positive t
226+ ) else Update_refused
227+
228+ let rec incr_if_negative t =
229+ let x = Atomic. get t in
230+ if is_closed x then Balance_closed
231+ else if counter x < 0 then (
232+ if Atomic. compare_and_set t x (x + 2 ) then Updated
233+ else incr_if_negative t
234+ ) else Update_refused
235+
236+ let make () = Atomic. make 0
237+
238+ let close t =
239+ value (Atomic. exchange t closed)
240+
241+ let get t = value (Atomic. get t)
242+
243+ let pp f t =
244+ match get t with
245+ | Ok x -> Fmt. int f x
246+ | Error `Closed -> Fmt. string f " (closed)"
247+ end
248+
172249type 'a t = {
173- balance : int Atomic .t ;
250+ balance : Balance .t ;
174251 consumers : 'a Q .t ;
175252 producers : 'a Q .t ;
176253}
@@ -180,13 +257,14 @@ type 'a loc =
180257 | Long of ('a Q .segment * 'a Cell .t Atomic .t ) (* Acting as suspender of cell; can cancel *)
181258
182259let dump f t =
183- Fmt. pf f " @[<v2>Sync (balance=%d )@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
184- ( Atomic. get t.balance)
260+ Fmt. pf f " @[<v2>Sync (balance=%a )@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
261+ Balance. pp t.balance
185262 Q. dump t.consumers
186263 Q. dump t.producers
187264
188265(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *)
189- let exchange item kc = item.kp (Ok (kc item.v))
266+ let exchange item kc =
267+ item.kp (if kc item.v then Sent else Rejected )
190268
191269(* Add [value] to [cell].
192270 If the cell is in transition, place [value] there and let the other party handle it later.
@@ -209,20 +287,6 @@ let rec add_to_cell queue value cell =
209287
210288(* Cancelling *)
211289
212- let rec decr_balance_if_positive t =
213- let cur = Atomic. get t.balance in
214- if cur > 0 then (
215- if Atomic. compare_and_set t.balance cur (cur - 1 ) then true
216- else decr_balance_if_positive t
217- ) else false
218-
219- let rec incr_balance_if_negative t =
220- let cur = Atomic. get t.balance in
221- if cur < 0 then (
222- if Atomic. compare_and_set t.balance cur (cur + 1 ) then true
223- else incr_balance_if_negative t
224- ) else false
225-
226290(* Cancel [cell] on our suspend queue.
227291 This function works for both consumers and producers, as we can tell from
228292 the value what our role is (and if there isn't a value, we're finished anyway).
@@ -232,7 +296,8 @@ let rec incr_balance_if_negative t =
232296let cancel t (segment , cell ) =
233297 let cancel2 update_balance ~old =
234298 if Atomic. compare_and_set cell old In_transition then (
235- if update_balance t then (
299+ match update_balance t.balance with
300+ | Updated ->
236301 (* At this point, we are committed to cancelling. *)
237302 begin match Atomic. exchange cell Finished with
238303 | Finished -> assert false
@@ -241,7 +306,7 @@ let cancel t (segment, cell) =
241306 | Slot kc -> add_to_cell t.producers (Slot kc) (Q. next_resume t.producers)
242307 end ;
243308 true
244- ) else (
309+ | Update_refused | Balance_closed ->
245310 (* We decided not to cancel. We know a resume is coming. *)
246311 if Atomic. compare_and_set cell In_transition old then false
247312 else (
@@ -253,13 +318,12 @@ let cancel t (segment, cell) =
253318 false
254319 | _ -> assert false
255320 )
256- )
257321 ) else false (* The peer resumed us first *)
258322 in
259323 match Atomic. get cell with
260324 | Finished -> false (* The peer resumed us first *)
261- | Slot _ as old -> cancel2 incr_balance_if_negative ~old (* We are a consumer *)
262- | Item _ as old -> cancel2 decr_balance_if_positive ~old (* We are a producer *)
325+ | Slot _ as old -> cancel2 Balance. incr_if_negative ~old (* We are a consumer *)
326+ | Item _ as old -> cancel2 Balance. decr_if_positive ~old (* We are a producer *)
263327 | In_transition ->
264328 (* Either we're initialising the cell, in which case we haven't told the
265329 application how to cancel this location yet, or we're already
@@ -292,16 +356,20 @@ let rec producer_resume_cell t ~success ~in_transition cell =
292356
293357(* This is essentially the main [put] function, but parameterised so it can be shared with
294358 the rejoin-after-rejection case. *)
295- let producer_join (t : _ t ) ~success ~suspend =
296- let old = Atomic. fetch_and_add t.balance (+ 1 ) in
297- if old < 0 then (
298- let cell = Q. next_resume t.consumers in
299- producer_resume_cell t cell
300- ~success
301- ~in_transition: (fun cell -> suspend (Short cell))
302- ) else (
303- suspend (Long (Q. next_suspend t.producers))
304- )
359+ let producer_join (t : _ t ) ~success ~suspend ~closed =
360+ match Balance. fetch_and_add t.balance (+ 1 ) with
361+ | Error `Closed -> closed ()
362+ | Ok old ->
363+ if old < 0 then (
364+ let cell = Q. next_resume t.consumers in
365+ producer_resume_cell t cell
366+ ~success
367+ ~in_transition: (fun cell -> suspend (Short cell))
368+ ) else (
369+ suspend (Long (Q. next_suspend t.producers))
370+ )
371+
372+ let put_closed_err = Invalid_argument " Stream closed"
305373
306374(* Called when a consumer took our value but then rejected it.
307375 We start the put operation again, except that our fiber is already suspended
@@ -310,6 +378,7 @@ let producer_join (t : _ t) ~success ~suspend =
310378let put_already_suspended t request =
311379 producer_join t
312380 ~success: (exchange request)
381+ ~closed: (fun () -> request.kp (Failed put_closed_err))
313382 ~suspend: (fun loc ->
314383 let Short cell | Long (_, cell) = loc in
315384 add_to_cell t.consumers (Item request) cell;
@@ -323,7 +392,7 @@ let put_already_suspended t request =
323392 (* We got cancelled after the peer removed our cell and before we updated the
324393 cancel function with the new location, or we were cancelled while doing a
325394 (non-cancellable) resume. Deal with it now. *)
326- if cancel t loc then request.kp (Error ex);
395+ if cancel t loc then request.kp (Failed ex);
327396 (* else we got resumed first *)
328397 | _ , Short _ ->
329398 (* We can't cancel while in the process of resuming a cell on the [consumers] queue.
@@ -346,12 +415,12 @@ let put_suspend t v loc =
346415 | Long loc -> `Suspended (fun () -> cancel t loc)
347416 in
348417 let rec item = {
349- v;
418+ v = Ok v ;
350419 cancel = Atomic. make cancel;
351420 kp = function
352- | Error _ as e -> enqueue e (* Cancelled by [put_already_suspended]. * )
353- | Ok true -> enqueue (Ok () ) (* Success! *)
354- | Ok false -> put_already_suspended t item (* Consumer rejected value. Restart. *)
421+ | Failed e -> enqueue ( Error e )
422+ | Sent -> enqueue (Ok () ) (* Success! *)
423+ | Rejected -> put_already_suspended t item (* Consumer rejected value. Restart. *)
355424 } in
356425 let Short cell | Long (_, cell) = loc in
357426 add_to_cell t.consumers (Item item) cell;
@@ -368,8 +437,9 @@ let put_suspend t v loc =
368437
369438let rec put (t : _ t ) v =
370439 producer_join t
371- ~success: (fun kc -> if kc v then () else put t v)
440+ ~success: (fun kc -> if kc ( Ok v) then () else put t v)
372441 ~suspend: (put_suspend t v)
442+ ~closed: (fun () -> raise put_closed_err)
373443
374444(* Taking. *)
375445
@@ -402,25 +472,35 @@ let take_suspend t loc =
402472 )
403473
404474let take (t : _ t ) =
405- let old = Atomic. fetch_and_add t.balance (- 1 ) in
406- if old > 0 then (
407- let cell = Q. next_resume t.producers in
408- consumer_resume_cell t cell
409- ~success: (fun item -> item.kp (Ok true ); item.v)
410- ~in_transition: (fun cell -> take_suspend t (Short cell))
411- ) else (
412- take_suspend t (Long (Q. next_suspend t.consumers))
413- )
475+ match Balance. fetch_and_add t.balance (- 1 ) with
476+ | Error `Closed as e -> e
477+ | Ok old ->
478+ if old > 0 then (
479+ let cell = Q. next_resume t.producers in
480+ consumer_resume_cell t cell
481+ ~success: (fun item -> item.kp Sent ; item.v)
482+ ~in_transition: (fun cell -> take_suspend t (Short cell))
483+ ) else (
484+ take_suspend t (Long (Q. next_suspend t.consumers))
485+ )
486+
487+ let take t =
488+ (take t
489+ : (_, [ `Closed ]) result
490+ :> (_, [> `Closed ]) result)
414491
415492let reject = Slot (fun _ -> false )
416493
417494let take_nonblocking (t : _ t ) =
418- if decr_balance_if_positive t then (
495+ match Balance. decr_if_positive t.balance with
496+ | Balance_closed -> Error `Closed
497+ | Update_refused -> Error `Would_block (* No waiting producers for us *)
498+ | Updated ->
419499 let rec aux cell =
420500 consumer_resume_cell t cell
421501 ~success: (fun item ->
422- item.kp ( Ok true ) ; (* Always accept the item *)
423- Some item.v
502+ item.kp Sent ; (* Always accept the item *)
503+ ( item.v :> (_, [`Closed | `Would_block] ) result)
424504 )
425505 ~in_transition: (fun cell ->
426506 (* Our producer is still in the process of writing its [Item], but
@@ -430,19 +510,43 @@ let take_nonblocking (t : _ t) =
430510 todo: could spin for a bit here first - the Item will probably arrive soon,
431511 and that would avoid making the producer start again. *)
432512 Domain. cpu_relax () ; (* Brief wait to encourage producer to finish *)
433- if Atomic. compare_and_set cell In_transition reject then None
513+ if Atomic. compare_and_set cell In_transition reject then Error `Would_block
434514 else aux cell
435515 )
436516 in aux (Q. next_resume t.producers)
437- ) else None (* No waiting producers for us *)
517+
518+ let take_nonblocking t =
519+ (take_nonblocking t
520+ : (_, [ `Would_block | `Closed ]) result
521+ :> (_, [> `Would_block | `Closed ]) result)
438522
439523(* Creation and status. *)
440524
441525let create () =
442526 {
443527 consumers = Q. make () ;
444528 producers = Q. make () ;
445- balance = Atomic . make 0 ;
529+ balance = Balance . make () ;
446530 }
447531
448- let balance t = Atomic. get t.balance
532+ let close t =
533+ match Balance. close t.balance with
534+ | Error `Closed -> ()
535+ | Ok old ->
536+ if old > 0 then (
537+ (* Reject each waiting producer. They will try to restart and then discover the stream is closed. *)
538+ for _ = 1 to old do
539+ let cell = Q. next_resume t.producers in
540+ add_to_cell t.consumers reject cell;
541+ done
542+ ) else (
543+ let reject_consumer = Item { v = Error `Closed ; kp = ignore; cancel = Atomic. make `Resuming } in
544+ (* Reject each waiting consumer. *)
545+ for _ = 1 to - old do
546+ let cell = Q. next_resume t.consumers in
547+ add_to_cell t.consumers reject_consumer cell
548+ done
549+ )
550+
551+ let balance t =
552+ Balance. get t.balance
0 commit comments