123123 the consumer sets its cell to a request with a dummy callback that rejects
124124 all values and continues immediately.
125125
126+ Close
127+
128+ The LSB of the balance atomic is used to indicate that the stream has been closed.
129+ When closed, the balance is always zero and no new consumers or producers can be added.
130+ The closing thread is responsible for cancelling all pre-existing users.
131+
126132 The exchange
127133
128134 Once a producer and consumer have been paired off (and so their cell is now Finished),
@@ -137,9 +143,14 @@ module Fiber_context = Eio__core.Private.Fiber_context
137143module Suspend = Eio__core.Private. Suspend
138144module Cancel = Eio__core. Cancel
139145
146+ type producer_result =
147+ | Sent (* Consumer accepted item. *)
148+ | Rejected (* Consumer rejected the item. Retry. *)
149+ | Failed of exn (* Cancelled or closed. *)
150+
140151type 'a item = {
141- v : 'a ;
142- kp : ( bool , exn ) result -> unit ; (* [Ok false] means consumer refused the item; retry. *)
152+ v : ( 'a , [ `Closed ]) result ;
153+ kp : producer_result -> unit ;
143154 cancel : [
144155 | `Resuming (* In the process of resuming, so can't cancel. *)
145156 | `Suspended of (unit -> bool ) (* Call this function to attempt to leave the queue. *)
@@ -149,7 +160,7 @@ type 'a item = {
149160
150161type 'a cell =
151162 | In_transition
152- | Slot of ('a -> bool )
163+ | Slot of (( 'a , [ `Closed ]) result -> bool )
153164 | Item of 'a item
154165 | Finished
155166
169180
170181module Q = Cells. Make (Cell )
171182
183+ type update_result =
184+ | Updated
185+ | Update_refused
186+ | Balance_closed
187+
188+ module Balance : sig
189+ type t
190+
191+ val make : unit -> t
192+ val close : t -> (int , [> `Closed ]) result
193+
194+ val get : t -> (int , [> `Closed ]) result
195+ (* * [get t] is [None] if [t] is closed. Otherwise, it is
196+ the number of items available (if non-negative) or the
197+ number of consumers waiting for an item. *)
198+
199+ val fetch_and_add : t -> int -> (int , [> `Closed ]) result
200+ (* * [fetch_and_add t diff] increases the value by [diff] and returns the old value. *)
201+
202+ val incr_if_negative : t -> update_result
203+ val decr_if_positive : t -> update_result
204+
205+ val pp : t Fmt .t
206+ end = struct
207+ type t = int Atomic .t
208+
209+ let closed = 1
210+ let counter x = x asr 1
211+ let is_closed x = (x land 1 ) <> 0
212+
213+ let value x =
214+ if is_closed x then Error `Closed else Ok (x asr 1 )
215+
216+ let fetch_and_add x diff =
217+ value (Atomic. fetch_and_add x (diff lsl 1 ))
218+
219+ let rec decr_if_positive t =
220+ let x = Atomic. get t in
221+ if is_closed x then Balance_closed
222+ else if counter x > 0 then (
223+ if Atomic. compare_and_set t x (x - 2 ) then Updated
224+ else decr_if_positive t
225+ ) else Update_refused
226+
227+ let rec incr_if_negative t =
228+ let x = Atomic. get t in
229+ if is_closed x then Balance_closed
230+ else if counter x < 0 then (
231+ if Atomic. compare_and_set t x (x + 2 ) then Updated
232+ else incr_if_negative t
233+ ) else Update_refused
234+
235+ let make () = Atomic. make 0
236+
237+ let close t =
238+ value (Atomic. exchange t closed)
239+
240+ let get t = value (Atomic. get t)
241+
242+ let pp f t =
243+ match get t with
244+ | Ok x -> Fmt. int f x
245+ | Error `Closed -> Fmt. string f " (closed)"
246+ end
247+
172248type 'a t = {
173- balance : int Atomic .t ;
249+ balance : Balance .t ;
174250 consumers : 'a Q .t ;
175251 producers : 'a Q .t ;
176252}
@@ -180,13 +256,14 @@ type 'a loc =
180256 | Long of ('a Q .segment * 'a Cell .t Atomic .t ) (* Acting as suspender of cell; can cancel *)
181257
182258let dump f t =
183- Fmt. pf f " @[<v2>Sync (balance=%d )@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
184- ( Atomic. get t.balance)
259+ Fmt. pf f " @[<v2>Sync (balance=%a )@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
260+ Balance. pp t.balance
185261 Q. dump t.consumers
186262 Q. dump t.producers
187263
188264(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *)
189- let exchange item kc = item.kp (Ok (kc item.v))
265+ let exchange item kc =
266+ item.kp (if kc item.v then Sent else Rejected )
190267
191268(* Add [value] to [cell].
192269 If the cell is in transition, place [value] there and let the other party handle it later.
@@ -209,20 +286,6 @@ let rec add_to_cell queue value cell =
209286
210287(* Cancelling *)
211288
212- let rec decr_balance_if_positive t =
213- let cur = Atomic. get t.balance in
214- if cur > 0 then (
215- if Atomic. compare_and_set t.balance cur (cur - 1 ) then true
216- else decr_balance_if_positive t
217- ) else false
218-
219- let rec incr_balance_if_negative t =
220- let cur = Atomic. get t.balance in
221- if cur < 0 then (
222- if Atomic. compare_and_set t.balance cur (cur + 1 ) then true
223- else incr_balance_if_negative t
224- ) else false
225-
226289(* Cancel [cell] on our suspend queue.
227290 This function works for both consumers and producers, as we can tell from
228291 the value what our role is (and if there isn't a value, we're finished anyway).
@@ -232,7 +295,8 @@ let rec incr_balance_if_negative t =
232295let cancel t (segment , cell ) =
233296 let cancel2 update_balance ~old =
234297 if Atomic. compare_and_set cell old In_transition then (
235- if update_balance t then (
298+ match update_balance t.balance with
299+ | Updated ->
236300 (* At this point, we are committed to cancelling. *)
237301 begin match Atomic. exchange cell Finished with
238302 | Finished -> assert false
@@ -241,7 +305,7 @@ let cancel t (segment, cell) =
241305 | Slot kc -> add_to_cell t.producers (Slot kc) (Q. next_resume t.producers)
242306 end ;
243307 true
244- ) else (
308+ | Update_refused | Balance_closed ->
245309 (* We decided not to cancel. We know a resume is coming. *)
246310 if Atomic. compare_and_set cell In_transition old then false
247311 else (
@@ -253,13 +317,12 @@ let cancel t (segment, cell) =
253317 false
254318 | _ -> assert false
255319 )
256- )
257320 ) else false (* The peer resumed us first *)
258321 in
259322 match Atomic. get cell with
260323 | Finished -> false (* The peer resumed us first *)
261- | Slot _ as old -> cancel2 incr_balance_if_negative ~old (* We are a consumer *)
262- | Item _ as old -> cancel2 decr_balance_if_positive ~old (* We are a producer *)
324+ | Slot _ as old -> cancel2 Balance. incr_if_negative ~old (* We are a consumer *)
325+ | Item _ as old -> cancel2 Balance. decr_if_positive ~old (* We are a producer *)
263326 | In_transition ->
264327 (* Either we're initialising the cell, in which case we haven't told the
265328 application how to cancel this location yet, or we're already
@@ -292,16 +355,20 @@ let rec producer_resume_cell t ~success ~in_transition cell =
292355
293356(* This is essentially the main [put] function, but parameterised so it can be shared with
294357 the rejoin-after-rejection case. *)
295- let producer_join (t : _ t ) ~success ~suspend =
296- let old = Atomic. fetch_and_add t.balance (+ 1 ) in
297- if old < 0 then (
298- let cell = Q. next_resume t.consumers in
299- producer_resume_cell t cell
300- ~success
301- ~in_transition: (fun cell -> suspend (Short cell))
302- ) else (
303- suspend (Long (Q. next_suspend t.producers))
304- )
358+ let producer_join (t : _ t ) ~success ~suspend ~closed =
359+ match Balance. fetch_and_add t.balance (+ 1 ) with
360+ | Error `Closed -> closed ()
361+ | Ok old ->
362+ if old < 0 then (
363+ let cell = Q. next_resume t.consumers in
364+ producer_resume_cell t cell
365+ ~success
366+ ~in_transition: (fun cell -> suspend (Short cell))
367+ ) else (
368+ suspend (Long (Q. next_suspend t.producers))
369+ )
370+
371+ let put_closed_err = Invalid_argument " Stream closed"
305372
306373(* Called when a consumer took our value but then rejected it.
307374 We start the put operation again, except that our fiber is already suspended
@@ -310,6 +377,7 @@ let producer_join (t : _ t) ~success ~suspend =
310377let put_already_suspended t request =
311378 producer_join t
312379 ~success: (exchange request)
380+ ~closed: (fun () -> request.kp (Failed put_closed_err))
313381 ~suspend: (fun loc ->
314382 let Short cell | Long (_, cell) = loc in
315383 add_to_cell t.consumers (Item request) cell;
@@ -323,7 +391,7 @@ let put_already_suspended t request =
323391 (* We got cancelled after the peer removed our cell and before we updated the
324392 cancel function with the new location, or we were cancelled while doing a
325393 (non-cancellable) resume. Deal with it now. *)
326- if cancel t loc then request.kp (Error ex);
394+ if cancel t loc then request.kp (Failed ex);
327395 (* else we got resumed first *)
328396 | _ , Short _ ->
329397 (* We can't cancel while in the process of resuming a cell on the [consumers] queue.
@@ -346,12 +414,12 @@ let put_suspend t v loc =
346414 | Long loc -> `Suspended (fun () -> cancel t loc)
347415 in
348416 let rec item = {
349- v;
417+ v = Ok v ;
350418 cancel = Atomic. make cancel;
351419 kp = function
352- | Error _ as e -> enqueue e (* Cancelled by [put_already_suspended]. * )
353- | Ok true -> enqueue (Ok () ) (* Success! *)
354- | Ok false -> put_already_suspended t item (* Consumer rejected value. Restart. *)
420+ | Failed e -> enqueue ( Error e )
421+ | Sent -> enqueue (Ok () ) (* Success! *)
422+ | Rejected -> put_already_suspended t item (* Consumer rejected value. Restart. *)
355423 } in
356424 let Short cell | Long (_, cell) = loc in
357425 add_to_cell t.consumers (Item item) cell;
@@ -368,8 +436,9 @@ let put_suspend t v loc =
368436
369437let rec put (t : _ t ) v =
370438 producer_join t
371- ~success: (fun kc -> if kc v then () else put t v)
439+ ~success: (fun kc -> if kc ( Ok v) then () else put t v)
372440 ~suspend: (put_suspend t v)
441+ ~closed: (fun () -> raise put_closed_err)
373442
374443(* Taking. *)
375444
@@ -402,25 +471,35 @@ let take_suspend t loc =
402471 )
403472
404473let take (t : _ t ) =
405- let old = Atomic. fetch_and_add t.balance (- 1 ) in
406- if old > 0 then (
407- let cell = Q. next_resume t.producers in
408- consumer_resume_cell t cell
409- ~success: (fun item -> item.kp (Ok true ); item.v)
410- ~in_transition: (fun cell -> take_suspend t (Short cell))
411- ) else (
412- take_suspend t (Long (Q. next_suspend t.consumers))
413- )
474+ match Balance. fetch_and_add t.balance (- 1 ) with
475+ | Error _ as e -> e
476+ | Ok old ->
477+ if old > 0 then (
478+ let cell = Q. next_resume t.producers in
479+ consumer_resume_cell t cell
480+ ~success: (fun item -> item.kp Sent ; item.v)
481+ ~in_transition: (fun cell -> take_suspend t (Short cell))
482+ ) else (
483+ take_suspend t (Long (Q. next_suspend t.consumers))
484+ )
485+
486+ let take t =
487+ (take t
488+ : (_, [ `Closed ]) result
489+ :> (_, [> `Closed ]) result)
414490
415491let reject = Slot (fun _ -> false )
416492
417493let take_nonblocking (t : _ t ) =
418- if decr_balance_if_positive t then (
494+ match Balance. decr_if_positive t.balance with
495+ | Balance_closed -> Error `Closed
496+ | Update_refused -> Error `Would_block (* No waiting producers for us *)
497+ | Updated ->
419498 let rec aux cell =
420499 consumer_resume_cell t cell
421500 ~success: (fun item ->
422- item.kp ( Ok true ) ; (* Always accept the item *)
423- Some item.v
501+ item.kp Sent ; (* Always accept the item *)
502+ ( item.v :> (_, [`Closed | `Would_block] ) result)
424503 )
425504 ~in_transition: (fun cell ->
426505 (* Our producer is still in the process of writing its [Item], but
@@ -430,19 +509,43 @@ let take_nonblocking (t : _ t) =
430509 todo: could spin for a bit here first - the Item will probably arrive soon,
431510 and that would avoid making the producer start again. *)
432511 Domain. cpu_relax () ; (* Brief wait to encourage producer to finish *)
433- if Atomic. compare_and_set cell In_transition reject then None
512+ if Atomic. compare_and_set cell In_transition reject then Error `Would_block
434513 else aux cell
435514 )
436515 in aux (Q. next_resume t.producers)
437- ) else None (* No waiting producers for us *)
516+
517+ let take_nonblocking t =
518+ (take_nonblocking t
519+ : (_, [ `Would_block | `Closed ]) result
520+ :> (_, [> `Would_block | `Closed ]) result)
438521
439522(* Creation and status. *)
440523
441524let create () =
442525 {
443526 consumers = Q. make () ;
444527 producers = Q. make () ;
445- balance = Atomic . make 0 ;
528+ balance = Balance . make () ;
446529 }
447530
448- let balance t = Atomic. get t.balance
531+ let close t =
532+ match Balance. close t.balance with
533+ | Error `Closed -> ()
534+ | Ok old ->
535+ if old > 0 then (
536+ (* Reject each waiting producer. They will try to restart and then discover the stream is closed. *)
537+ for _ = 1 to old do
538+ let cell = Q. next_resume t.producers in
539+ add_to_cell t.consumers reject cell;
540+ done
541+ ) else (
542+ let reject_consumer = Item { v = Error `Closed ; kp = ignore; cancel = Atomic. make `Resuming } in
543+ (* Reject each waiting consumer. *)
544+ for _ = 1 to - old do
545+ let cell = Q. next_resume t.consumers in
546+ add_to_cell t.consumers reject_consumer cell
547+ done
548+ )
549+
550+ let balance t =
551+ Balance. get t.balance
0 commit comments