Skip to content

Commit 4236fb3

Browse files
committed
Support fs2.io.toInputStream[Resource] on Scala Native and run tests
1 parent 09524d7 commit 4236fb3

File tree

5 files changed

+27
-26
lines changed

5 files changed

+27
-26
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,8 @@ lazy val root = tlCrossRootProject
378378

379379
lazy val commonNativeSettings = Seq[Setting[?]](
380380
tlVersionIntroduced := List("2.12", "2.13", "3").map(_ -> "3.13.0").toMap,
381-
Test / nativeBrewFormulas += "openssl"
381+
Test / nativeBrewFormulas += "openssl",
382+
Test / nativeConfig ~= { _.withEmbedResources(true) }
382383
)
383384

384385
lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)

io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala renamed to io/jvm-native/src/main/scala/fs2/io/JavaInputOutputStream.scala

File renamed without changes.

io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,26 @@ private[fs2] trait iojvmnative {
110110
}
111111
}
112112

113+
/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
114+
* that is closed whenever the resulting stream terminates.
115+
*
116+
* If the `close` of resulting input stream is invoked manually, then this will await until the
117+
* original stream completely terminates.
118+
*
119+
* Because all `InputStream` methods block (including `close`), the resulting `InputStream`
120+
* should be consumed on a different thread pool than the one that is backing the effect.
121+
*
122+
* Note that the implementation is not thread safe -- only one thread is allowed at any time
123+
* to operate on the resulting `java.io.InputStream`.
124+
*/
125+
def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] =
126+
source => Stream.resource(toInputStreamResource(source))
127+
128+
/** Like [[toInputStream]] but returns a `Resource` rather than a single element stream.
129+
*/
130+
def toInputStreamResource[F[_]: Async](
131+
source: Stream[F, Byte]
132+
): Resource[F, InputStream] =
133+
JavaInputOutputStream.toInputStream(source)
134+
113135
}

io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala renamed to io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ class IoPlatformSuite extends Fs2Suite {
275275
bar.assertEquals("bar")
276276
}
277277
test("classloader") {
278-
val size = readClassLoaderResource[IO]("fs2/io/foo", 8192).as(1L).compile.foldMonoid
278+
val resourcePath = if (isNative) "/fs2/io/foo" else "fs2/io/foo"
279+
val size = readClassLoaderResource[IO](resourcePath, 8192).as(1L).compile.foldMonoid
279280
size.assertEquals(3L)
280281
}
281282
}

io/jvm/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ package fs2
2323
package io
2424

2525
import cats.Show
26-
import cats.effect.kernel.{Async, Resource, Sync}
26+
import cats.effect.kernel.{Async, Sync}
2727
import cats.effect.kernel.implicits._
2828
import cats.syntax.all._
2929
import fs2.internal.ThreadFactories
3030

31-
import java.io.InputStream
3231
import java.nio.charset.Charset
3332
import java.nio.charset.StandardCharsets
3433
import java.util.concurrent.Executors
@@ -92,28 +91,6 @@ private[fs2] trait ioplatform extends iojvmnative {
9291
): Pipe[F, O, Nothing] =
9392
_.map(_.show).through(text.encode(charset)).through(stdout)
9493

95-
/** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`,
96-
* that is closed whenever the resulting stream terminates.
97-
*
98-
* If the `close` of resulting input stream is invoked manually, then this will await until the
99-
* original stream completely terminates.
100-
*
101-
* Because all `InputStream` methods block (including `close`), the resulting `InputStream`
102-
* should be consumed on a different thread pool than the one that is backing the effect.
103-
*
104-
* Note that the implementation is not thread safe -- only one thread is allowed at any time
105-
* to operate on the resulting `java.io.InputStream`.
106-
*/
107-
def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] =
108-
source => Stream.resource(toInputStreamResource(source))
109-
110-
/** Like [[toInputStream]] but returns a `Resource` rather than a single element stream.
111-
*/
112-
def toInputStreamResource[F[_]: Async](
113-
source: Stream[F, Byte]
114-
): Resource[F, InputStream] =
115-
JavaInputOutputStream.toInputStream(source)
116-
11794
// Using null instead of Option because null check is faster
11895
private lazy val vtExecutor: ExecutionContext = {
11996
val javaVersion: Int =

0 commit comments

Comments
 (0)