Skip to content

Commit e1c60a4

Browse files
committed
Use CanFilter type to distinguish FilterableChannels
1 parent 9736783 commit e1c60a4

File tree

3 files changed

+110
-57
lines changed

3 files changed

+110
-57
lines changed

tests/pos/suspend-strawman-2/Async.scala

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ trait Async extends AsyncConfig:
3232

3333
object Async:
3434

35+
/** A marker type for Source#CanFilter */
36+
opaque type Yes = Unit
37+
3538
abstract class Impl(val root: Cancellable, val scheduler: Scheduler)
3639
(using boundary.Label[Unit]) extends Async:
3740

@@ -83,6 +86,8 @@ object Async:
8386
*/
8487
trait Source[+T]:
8588

89+
type CanFilter
90+
8691
/** If data is available at present, pass it to function `k`
8792
* and return the result if this call.
8893
* `k` returns true iff the data was consumed in an async block.
@@ -105,27 +110,9 @@ object Async:
105110

106111
end Source
107112

108-
/** A source that can be mapped, filtered, or raced. Only ComposableSources
109-
* can pass `false` to the `Listener` in `poll` or `onComplete`. They do
110-
* that if the data is rejected by a filter or did not come first in a race.
111-
*/
112-
trait ComposableSource[+T] extends Source[T]:
113-
114-
/** Pass on data transformed by `f` */
115-
def map[U](f: T => U): ComposableSource[U] =
116-
new DerivedSource[T, U](this):
117-
def listen(x: T, k: Listener[U]) = k(f(x))
118-
119-
/** Pass on only data matching the predicate `p` */
120-
def filter(p: T => Boolean): ComposableSource[T] =
121-
new DerivedSource[T, T](this):
122-
def listen(x: T, k: Listener[T]) = p(x) && k(x)
123-
124-
end ComposableSource
125-
126113
/** As source that transforms an original source in some way */
127114

128-
abstract class DerivedSource[T, U](src: Source[T]) extends ComposableSource[U]:
115+
abstract class DerivedSource[T, U](val original: Source[T]) extends Source[U]:
129116

130117
/** Handle a value `x` passed to the original source by possibly
131118
* invokiong the continuation for this source.
@@ -137,16 +124,34 @@ object Async:
137124
def apply(x: T): Boolean = listen(x, k)
138125

139126
def poll(k: Listener[U]): Boolean =
140-
src.poll(transform(k))
127+
original.poll(transform(k))
141128
def onComplete(k: Listener[U]): Unit =
142-
src.onComplete(transform(k))
129+
original.onComplete(transform(k))
143130
def dropListener(k: Listener[U]): Unit =
144-
src.dropListener(transform(k))
131+
original.dropListener(transform(k))
145132
end DerivedSource
146133

134+
extension [T](src: Source[T])
135+
136+
/** Pass on data transformed by `f` */
137+
def map[U](f: T => U): Source[U] { type CanFilter = src.CanFilter } =
138+
new DerivedSource[T, U](src):
139+
type CanFilter = src.CanFilter
140+
def listen(x: T, k: Listener[U]) = k(f(x))
141+
142+
extension [T](src: Source[T] { type CanFilter = Yes })
143+
144+
/** Pass on only data matching the predicate `p` */
145+
def filter(p: T => Boolean): Source[T] { type CanFilter = src.CanFilter } =
146+
new DerivedSource[T, T](src):
147+
type CanFilter = src.CanFilter
148+
def listen(x: T, k: Listener[T]) = p(x) && k(x)
149+
150+
147151
/** Pass first result from any of `sources` to the continuation */
148-
def race[T](sources: ComposableSource[T]*): ComposableSource[T] =
149-
new ComposableSource:
152+
def race[T, CF](sources: Source[T] { type CanFilter <: CF} *): Source[T] { type CanFilter <: CF } =
153+
new Source[T]:
154+
type CanFilter <: CF
150155

151156
def poll(k: Listener[T]): Boolean =
152157
val it = sources.iterator
@@ -180,8 +185,11 @@ object Async:
180185
/** If left (respectively, right) source succeeds with `x`, pass `Left(x)`,
181186
* (respectively, Right(x)) on to the continuation.
182187
*/
183-
def either[T, U](src1: ComposableSource[T], src2: ComposableSource[U]): ComposableSource[Either[T, U]] =
184-
race[Either[T, U]](src1.map(Left(_)), src2.map(Right(_)))
188+
def either[T, U, CF](
189+
src1: Source[T] { type CanFilter <: CF },
190+
src2: Source[U] { type CanFilter <: CF })
191+
: Source[Either[T, U]] { type CanFilter <: CF } =
192+
race[Either[T, U], CF](src1.map(Left(_)), src2.map(Right(_)))
185193

186194
end Async
187195

tests/pos/suspend-strawman-2/channels.scala

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package concurrent
22
import scala.collection.mutable, mutable.ListBuffer
33
import scala.util.boundary.Label
44
import runtime.suspend
5-
import Async.{Listener, await}
5+
import Async.{Listener, await, Yes}
6+
7+
/** An unbounded channel
8+
* Unbounded channels are composable async sources.
9+
*/
10+
class UnboundedChannel[T] extends Async.Source[T]:
11+
type CanFilter = Yes
612

7-
/** An unbounded channel */
8-
class UnboundedChannel[T] extends Async.ComposableSource[T]:
913
private val pending = ListBuffer[T]()
1014
private val waiting = mutable.Set[Listener[T]]()
1115

@@ -45,69 +49,108 @@ class UnboundedChannel[T] extends Async.ComposableSource[T]:
4549

4650
end UnboundedChannel
4751

52+
/** An unbuffered, synchronous channel. Senders and readers both block
53+
* until a communication between them happens.
54+
* The channel provides two async sources, one for reading and one for
55+
* sending. The two sources are not composable. This allows a simple
56+
* implementation strategy where at each point either some senders
57+
* are waiting for matching readers, or some readers are waiting for matching
58+
* senders, or the channel is idle, i.e. there are no waiting readers or senders.
59+
* If a send operation encounters some waiting readers, or a read operation
60+
* encounters some waiting sender the data is transmitted directly. Otherwise
61+
* we add the operation to the corresponding waiting pending set.
62+
*/
4863
trait SyncChannel[T]:
49-
def canRead: Async.Source[T]
50-
def canSend: Async.Source[Listener[T]]
64+
thisCannel =>
65+
66+
type CanFilter
67+
68+
val canRead: Async.Source[T] { type CanFilter = thisCannel.CanFilter }
69+
val canSend: Async.Source[Listener[T]] { type CanFilter = thisCannel.CanFilter }
5170

5271
def send(x: T)(using Async): Unit = await(canSend)(x)
5372

5473
def read()(using Async): T = await(canRead)
5574

5675
object SyncChannel:
57-
def apply[T](): SyncChannel[T] = new Impl[T]:
58-
val canRead = new ReadSource
59-
val canSend = new SendSource
76+
def apply[T](): SyncChannel[T] = Impl[T]()
77+
78+
class Impl[T] extends SyncChannel[T]:
6079

61-
abstract class Impl[T] extends SyncChannel[T]:
62-
protected val pendingReads = mutable.Set[Listener[T]]()
63-
protected val pendingSends = mutable.Set[Listener[Listener[T]]]()
80+
private val pendingReads = mutable.Set[Listener[T]]()
81+
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
6482

6583
protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
6684
pending.headOption match
67-
case Some(elem) => op(elem); true
85+
case Some(elem) =>
86+
val ok = op(elem)
87+
if !ok then
88+
// Since sources are not filterable, we can be here only if a race
89+
// was lost and the entry was not yet removed. In that case, remove
90+
// it here.
91+
pending -= pending.head
92+
link(pending, op)
93+
ok
6894
case None => false
6995

7096
private def collapse[T](k2: Listener[Listener[T]]): Option[T] =
7197
var r: Option[T] = None
7298
if k2 { x => r = Some(x); true } then r else None
7399

74-
protected class ReadSource extends Async.Source[T]:
100+
private class ReadSource extends Async.Source[T]:
101+
type CanFilter = Impl.this.CanFilter
75102
def poll(k: Listener[T]): Boolean =
76103
link(pendingSends, sender => collapse(sender).map(k) == Some(true))
77104
def onComplete(k: Listener[T]): Unit =
78105
if !poll(k) then pendingReads += k
79106
def dropListener(k: Listener[T]): Unit =
80107
pendingReads -= k
81108

82-
protected class SendSource extends Async.Source[Listener[T]]:
109+
private class SendSource extends Async.Source[Listener[T]]:
110+
type CanFilter = Impl.this.CanFilter
83111
def poll(k: Listener[Listener[T]]): Boolean =
84112
link(pendingReads, k(_))
85113
def onComplete(k: Listener[Listener[T]]): Unit =
86114
if !poll(k) then pendingSends += k
87115
def dropListener(k: Listener[Listener[T]]): Unit =
88116
pendingSends -= k
89-
end Impl
90117

118+
val canRead = new ReadSource
119+
val canSend = new SendSource
120+
end Impl
91121
end SyncChannel
92122

93-
trait ComposableSyncChannel[T] extends SyncChannel[T]:
94-
def canRead: Async.ComposableSource[T]
95-
def canSend: Async.ComposableSource[Listener[T]]
96-
97-
object ComposableSyncChannel:
98-
def apply[T](): ComposableSyncChannel[T] = new Impl[T]:
99-
val canRead = new ComposableReadSource
100-
val canSend = new ComposableSendSource
101-
102-
abstract class Impl[T] extends SyncChannel.Impl[T], ComposableSyncChannel[T]:
123+
object FilterableSyncChannel:
124+
def apply[T](): SyncChannel[T] { type CanFilter = Yes } = Impl[T]()
103125

126+
class Impl[T] extends SyncChannel.Impl[T]:
127+
type CanFilter = Yes
104128
override protected def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
129+
// Since sources are filterable, we have to match all pending readers or writers
130+
// against the incoming request
105131
pending.iterator.find(op) match
106132
case Some(elem) => pending -= elem; true
107133
case None => false
108134

109-
class ComposableReadSource extends ReadSource, Async.ComposableSource[T]
110-
class ComposableSendSource extends SendSource, Async.ComposableSource[Listener[T]]
111-
end Impl
112-
113-
end ComposableSyncChannel
135+
end FilterableSyncChannel
136+
137+
def TestRace =
138+
val c1, c2 = FilterableSyncChannel[Int]()
139+
val s = c1.canSend
140+
val c3 = Async.race(c1.canRead, c2.canRead)
141+
val c4 = c3.filter(_ >= 0)
142+
val d0 = SyncChannel[Int]()
143+
val d1 = Async.race(c1.canRead, c2.canRead, d0.canRead)
144+
val d2 = d1.map(_ + 1)
145+
val c5 = Async.either(c1.canRead, c2.canRead)
146+
.map:
147+
case Left(x) => -x
148+
case Right(x) => x
149+
.filter(_ >= 0)
150+
151+
//val d3bad = d1.filter(_ >= 0)
152+
val d5 = Async.either(c1.canRead, d2)
153+
.map:
154+
case Left(x) => -x
155+
case Right(x) => x
156+
//val d6bad = d5.filter(_ >= 0)

tests/pos/suspend-strawman-2/futures.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import runtime.suspend
1010

1111
/** A cancellable future that can suspend waiting for other synchronous sources
1212
*/
13-
trait Future[+T] extends Async.ComposableSource[Try[T]], Cancellable:
13+
trait Future[+T] extends Async.Source[Try[T]], Cancellable:
14+
15+
type CanFilter = Async.Yes
1416

1517
/** Wait for this future to be completed, return its value in case of success,
1618
* or rethrow exception in case of failure.

0 commit comments

Comments
 (0)