From 55833029cdec23fc8f6cd0e7543e8eccc553c3a2 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:10:21 -0500 Subject: [PATCH 01/11] Add dep on event-emitter --- bower.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bower.json b/bower.json index 81d651d..c33fcac 100644 --- a/bower.json +++ b/bower.json @@ -22,6 +22,7 @@ "purescript-exceptions": "^6.0.0", "purescript-node-buffer": "^9.0.0", "purescript-nullable": "^6.0.0", - "purescript-prelude": "^6.0.0" + "purescript-prelude": "^6.0.0", + "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0" } } From 0255bfc7ad0ea220ffb4ed72c2564862232d8692 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:12:01 -0500 Subject: [PATCH 02/11] Add coercion to event emitter --- src/Node/Stream.purs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 03eb611..a380f57 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -7,6 +7,7 @@ module Node.Stream , Write() , Writable() , Duplex() + , toEventEmitter , onData , onDataString , onDataEither @@ -37,15 +38,17 @@ module Node.Stream import Prelude -import Effect (Effect) -import Effect.Exception (throw, Error) import Data.Either (Either(..)) import Data.Maybe (Maybe(..), fromMaybe) -import Node.Buffer (Buffer) import Data.Nullable as N +import Effect (Effect) +import Effect.Exception (throw, Error) import Effect.Uncurried (EffectFn1, mkEffectFn1) +import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding) +import Node.EventEmitter (EventEmitter) +import Unsafe.Coerce (unsafeCoerce) -- | A stream. -- | @@ -70,6 +73,9 @@ type Writable r = Stream (write :: Write | r) -- | A duplex (readable _and_ writable stream) type Duplex = Stream (read :: Read, write :: Write) +toEventEmitter :: forall rw. Stream rw -> EventEmitter +toEventEmitter = unsafeCoerce + foreign import undefined :: forall a. a foreign import data Chunk :: Type From ffed9622c2e7c7188dcc7b30a198e55216cd44e4 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:14:26 -0500 Subject: [PATCH 03/11] Replace most onX fns with eventH --- src/Node/Stream.js | 32 -------------------------- src/Node/Stream.purs | 55 +++++++++++++++----------------------------- 2 files changed, 18 insertions(+), 69 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index cdd6748..5ed8c89 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -31,38 +31,6 @@ export function onDataEitherImpl(readChunk) { }; } -export function onEnd(s) { - return f => () => { - s.on("end", f); - }; -} - -export function onFinish(s) { - return f => () => { - s.on("finish", f); - }; -} - -export function onReadable(s) { - return f => () => { - s.on("readable", f); - }; -} - -export function onError(s) { - return f => () => { - s.on("error", e => { - f(e)(); - }); - }; -} - -export function onClose(s) { - return f => () => { - s.on("close", f); - }; -} - export function resume(s) { return () => { s.resume(); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index a380f57..45933d7 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -12,11 +12,11 @@ module Node.Stream , onDataString , onDataEither , setEncoding - , onReadable - , onEnd - , onFinish - , onClose - , onError + , readableH + , endH + , finishH + , closeH + , errorH , resume , pause , isPaused @@ -42,12 +42,13 @@ import Data.Either (Either(..)) import Data.Maybe (Maybe(..), fromMaybe) import Data.Nullable as N import Effect (Effect) -import Effect.Exception (throw, Error) +import Effect.Exception (Error, throw) import Effect.Uncurried (EffectFn1, mkEffectFn1) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding) -import Node.EventEmitter (EventEmitter) +import Node.EventEmitter (EventEmitter, EventHandle(..)) +import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0) import Unsafe.Coerce (unsafeCoerce) -- | A stream. @@ -194,40 +195,20 @@ setEncoding -> Effect Unit setEncoding r enc = setEncodingImpl r (show enc) --- | Listen for `readable` events. -foreign import onReadable - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +readableH :: forall w. EventHandle0 (Readable w) +readableH = EventHandle "readable" identity --- | Listen for `end` events. -foreign import onEnd - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +endH :: forall w. EventHandle0 (Readable w) +endH = EventHandle "end" identity --- | Listen for `finish` events. -foreign import onFinish - :: forall w - . Writable w - -> Effect Unit - -> Effect Unit +finishH :: forall w. EventHandle0 (Readable w) +finishH = EventHandle "finish" identity --- | Listen for `close` events. -foreign import onClose - :: forall w - . Stream w - -> Effect Unit - -> Effect Unit +closeH :: forall w. EventHandle0 (Readable w) +closeH = EventHandle "close" identity --- | Listen for `error` events. -foreign import onError - :: forall w - . Stream w - -> (Error -> Effect Unit) - -> Effect Unit +errorH :: forall w. EventHandle1 (Readable w) Error +errorH = EventHandle "error" mkEffectFn1 -- | Resume reading from the stream. foreign import resume :: forall w. Readable w -> Effect Unit From 37fd70b3fd500f3568c9f56ab51b32f1c274afc0 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 10:34:55 -0500 Subject: [PATCH 04/11] Update exports to modern conventions --- src/Node/Stream.purs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 45933d7..35fcd2e 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,12 +1,12 @@ -- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). module Node.Stream - ( Stream() - , Read() - , Readable() - , Write() - , Writable() - , Duplex() + ( Stream + , Read + , Readable + , Write + , Writable + , Duplex , toEventEmitter , onData , onDataString From bbc4171ef7218e2148d4d43bfa6cc6a10727deea Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 10:50:58 -0500 Subject: [PATCH 05/11] Add missing handlers for Writable/Readable --- src/Node/Stream.purs | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 35fcd2e..5b7e23e 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -12,11 +12,16 @@ module Node.Stream , onDataString , onDataEither , setEncoding - , readableH - , endH - , finishH , closeH , errorH + , drainH + , finishH + , pipeH + , unpipeH + , pauseH + , readableH + , resumeH + , endH , resume , pause , isPaused @@ -195,20 +200,35 @@ setEncoding -> Effect Unit setEncoding r enc = setEncodingImpl r (show enc) -readableH :: forall w. EventHandle0 (Readable w) -readableH = EventHandle "readable" identity +closeH :: forall rw. EventHandle0 (Stream rw) +closeH = EventHandle "close" identity -endH :: forall w. EventHandle0 (Readable w) -endH = EventHandle "end" identity +errorH :: forall rw. EventHandle1 (Stream rw) Error +errorH = EventHandle "error" mkEffectFn1 -finishH :: forall w. EventHandle0 (Readable w) +drainH :: forall r. EventHandle0 (Writable r) +drainH = EventHandle "drain" identity + +finishH :: forall r. EventHandle0 (Writable r) finishH = EventHandle "finish" identity -closeH :: forall w. EventHandle0 (Readable w) -closeH = EventHandle "close" identity +pipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +pipeH = EventHandle "pipe" mkEffectFn1 -errorH :: forall w. EventHandle1 (Readable w) Error -errorH = EventHandle "error" mkEffectFn1 +unpipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +unpipeH = EventHandle "unpipe" mkEffectFn1 + +pauseH :: forall w. EventHandle0 (Readable w) +pauseH = EventHandle "pause" identity + +readableH :: forall w. EventHandle0 (Readable w) +readableH = EventHandle "readable" identity + +resumeH :: forall w. EventHandle0 (Readable w) +resumeH = EventHandle "resume" identity + +endH :: forall w. EventHandle0 (Readable w) +endH = EventHandle "end" identity -- | Resume reading from the stream. foreign import resume :: forall w. Readable w -> Effect Unit From 69e85900e1bffb5d411c1feae967d1ba7ce4e196 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 12:02:24 -0500 Subject: [PATCH 06/11] Update read functions / define dataH* handlers --- src/Node/Stream.js | 52 ++++-------- src/Node/Stream.purs | 196 +++++++++++++++++++++++-------------------- 2 files changed, 123 insertions(+), 125 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index 5ed8c89..cc74159 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -1,35 +1,22 @@ -const _undefined = undefined; -export { _undefined as undefined }; - export function setEncodingImpl(s) { return enc => () => { s.setEncoding(enc); }; } -export function readChunkImpl(Left) { - return Right => chunk => { - if (chunk instanceof Buffer) { - return Right(chunk); - } else if (typeof chunk === "string") { - return Left(chunk); - } else { - throw new Error( - "Node.Stream.readChunkImpl: Unrecognised " + - "chunk type; expected String or Buffer, got: " + - chunk - ); - } - }; -} - -export function onDataEitherImpl(readChunk) { - return r => f => () => { - r.on("data", data => { - f(readChunk(data))(); - }); - }; -} +export const readChunkImpl = (useBuffer, useString, chunk) => { + if (chunk instanceof Buffer) { + return useBuffer(chunk); + } else if (typeof chunk === "string") { + return useString(chunk); + } else { + throw new Error( + "Node.Stream.readChunkImpl: Unrecognised " + + "chunk type; expected String or Buffer, got: " + + chunk + ); + } +}; export function resume(s) { return () => { @@ -59,16 +46,9 @@ export function unpipeAll(r) { return () => r.unpipe(); } -export function readImpl(readChunk) { - return Nothing => Just => r => s => () => { - const v = r.read(s); - if (v === null) { - return Nothing; - } else { - return Just(readChunk(v)); - } - }; -} +export const readImpl = (r) => r.read(); + +export const readSizeImpl = (r, size) => r.read(size); export function writeImpl(w) { return chunk => done => () => w.write(chunk, null, done); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 5b7e23e..b41ae37 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -8,9 +8,6 @@ module Node.Stream , Writable , Duplex , toEventEmitter - , onData - , onDataString - , onDataEither , setEncoding , closeH , errorH @@ -18,6 +15,10 @@ module Node.Stream , finishH , pipeH , unpipeH + , Chunk + , dataH + , dataHStr + , dataHEither , pauseH , readableH , resumeH @@ -29,8 +30,9 @@ module Node.Stream , unpipe , unpipeAll , read + , read' , readString - , readEither + , readString' , write , writeString , cork @@ -44,11 +46,12 @@ module Node.Stream import Prelude import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromMaybe) +import Data.Maybe (Maybe(..)) +import Data.Nullable (Nullable, toMaybe) import Data.Nullable as N import Effect (Effect) import Effect.Exception (Error, throw) -import Effect.Uncurried (EffectFn1, mkEffectFn1) +import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, mkEffectFn1, runEffectFn1, runEffectFn2, runEffectFn3) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding) @@ -82,104 +85,119 @@ type Duplex = Stream (read :: Read, write :: Write) toEventEmitter :: forall rw. Stream rw -> EventEmitter toEventEmitter = unsafeCoerce -foreign import undefined :: forall a. a - +-- | Internal type. This should not be used by end-users. foreign import data Chunk :: Type foreign import readChunkImpl - :: (forall l r. l -> Either l r) - -> (forall l r. r -> Either l r) - -> Chunk - -> Either String Buffer - -readChunk :: Chunk -> Either String Buffer -readChunk = readChunkImpl Left Right + :: forall r + . EffectFn3 + (EffectFn1 Buffer r) + (EffectFn1 String r) + Chunk + r -- | Listen for `data` events, returning data in a Buffer. Note that this will fail -- | if `setEncoding` has been called on the stream. -onData - :: forall w - . Readable w - -> (Buffer -> Effect Unit) - -> Effect Unit -onData r cb = - onDataEither r (cb <=< fromEither) - where - fromEither x = - case x of - Left _ -> - throw "Stream encoding should not be set" - Right buf -> - pure buf - -read - :: forall w - . Readable w - -> Maybe Int - -> Effect (Maybe Buffer) -read r size = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right b) -> pure (Just b) - +-- | +-- | This is likely the handler you want to use for converting a `Stream` into a `String`: +-- | ``` +-- | let useStringCb = ... +-- | ref <- Ref.new +-- | stream # on dataH \buf -> +-- | Ref.modify_ (\ref' -> Array.snoc ref' buf) ref +-- | stream # on endH do +-- | bufs <- Ref.read ref +-- | useStringCb $ Buffer.toString UTF8 $ Buffer.concat bufs +-- | ``` +dataH :: forall w. EventHandle (Readable w) (Buffer -> Effect Unit) (EffectFn1 Chunk Unit) +dataH = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 cb) + (mkEffectFn1 \_ -> throw "Got a String, not a Buffer. Stream encoding should not be set") + chunk + +-- | Listen for `data` events, returning data as a String. Note that this will fail +-- | if `setEncoding` has NOT been called on the stream. +dataHStr :: forall w. EventHandle (Readable w) (String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHStr = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 \_ -> throw "Got a Buffer, not String. Stream encoding must be set to get a String.") + (mkEffectFn1 cb) + chunk + +-- | Listen for `data` events, returning data in a Buffer or String. This will work +-- | regardless of whether `setEncoding` has been called or not. +dataHEither :: forall w. EventHandle (Readable w) (Either Buffer String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHEither = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 (cb <<< Left)) + (mkEffectFn1 (cb <<< Right)) + chunk + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read :: forall w. Readable w -> Effect (Maybe Buffer) +read r = do + chunk <- runEffectFn1 readImpl r + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readImpl :: forall w. EffectFn1 (Readable w) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read' :: forall w. Readable w -> Int -> Effect (Maybe Buffer) +read' r size = do + chunk <- runEffectFn2 readSizeImpl r size + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readSizeImpl :: forall w. EffectFn2 (Readable w) (Int) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. readString :: forall w . Readable w - -> Maybe Int -> Encoding -> Effect (Maybe String) -readString r size enc = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right buf) -> Just <$> Buffer.toString enc buf - -readEither +readString r enc = do + mbBuf <- read r + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf + +-- | Note: this will fail if `setEncoding` has been called on the stream. +readString' :: forall w . Readable w - -> Maybe Int - -> Effect (Maybe (Either String Buffer)) -readEither r size = readImpl readChunk Nothing Just r (fromMaybe undefined size) - -foreign import readImpl - :: forall r - . (Chunk -> Either String Buffer) - -> (forall a. Maybe a) - -> (forall a. a -> Maybe a) - -> Readable r -> Int - -> Effect (Maybe (Either String Buffer)) - --- | Listen for `data` events, returning data in a String, which will be --- | decoded using the given encoding. Note that this will fail if `setEncoding` --- | has been called on the stream. -onDataString - :: forall w - . Readable w -> Encoding - -> (String -> Effect Unit) - -> Effect Unit -onDataString r enc cb = onData r (cb <=< Buffer.toString enc) - --- | Listen for `data` events, returning data in an `Either String Buffer`. This --- | function is provided for the (hopefully rare) case that `setEncoding` has --- | been called on the stream. -onDataEither - :: forall r - . Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit -onDataEither r cb = onDataEitherImpl readChunk r cb - -foreign import onDataEitherImpl - :: forall r - . (Chunk -> Either String Buffer) - -> Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit + -> Effect (Maybe String) +readString' r size enc = do + mbBuf <- read' r size + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf foreign import setEncodingImpl :: forall w From 45d20da686e0d9e3890783c4f143d6f2640c67fb Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:31:54 -0500 Subject: [PATCH 07/11] Update tests --- test/Main.js | 22 +---------- test/Main.purs | 101 ++++++++++++++++++++++--------------------------- 2 files changed, 47 insertions(+), 76 deletions(-) diff --git a/test/Main.js b/test/Main.js index d623c00..5a46c5d 100644 --- a/test/Main.js +++ b/test/Main.js @@ -1,25 +1,5 @@ -import { WritableStreamBuffer, ReadableStreamBuffer } from "stream-buffers"; -import { PassThrough } from "stream"; - -export function writableStreamBuffer() { - return new WritableStreamBuffer; -} - -export function getContentsAsString(w) { - return () => w.getContentsAsString("utf8"); -} - -export function readableStreamBuffer() { - return new ReadableStreamBuffer; -} - -export function putImpl(str) { - return enc => r => () => { - r.put(str, enc); - }; -} - export { createGzip, createGunzip } from "zlib"; +import { PassThrough } from "stream"; export function passThrough() { return new PassThrough; diff --git a/test/Main.purs b/test/Main.purs index b93662c..6437fcd 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -2,14 +2,14 @@ module Test.Main where import Prelude -import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromJust, isJust, isNothing) +import Data.Maybe (fromJust, isJust, isNothing) import Effect (Effect) import Effect.Console (log) import Effect.Exception (error) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) -import Node.Stream (Duplex, Readable, Writable, destroyWithError, end, onData, onDataEither, onDataString, onError, onReadable, pipe, read, readString, setDefaultEncoding, setEncoding, writeString) +import Node.EventEmitter (on_) +import Node.Stream (Duplex, dataH, dataHStr, destroyWithError, end, errorH, pipe, read, readString, readableH, setDefaultEncoding, setEncoding, writeString) import Partial.Unsafe (unsafePartial) import Test.Assert (assert, assert') @@ -17,27 +17,16 @@ assertEqual :: forall a. Show a => Eq a => a -> a -> Effect Unit assertEqual x y = assert' (show x <> " did not equal " <> show y) (x == y) -foreign import writableStreamBuffer :: Effect (Writable ()) - -foreign import getContentsAsString :: forall r. Writable r -> Effect String - -foreign import readableStreamBuffer :: Effect (Readable ()) - -foreign import putImpl :: forall r. String -> String -> Readable r -> Effect Unit - -put :: forall r. String -> Encoding -> Readable r -> Effect Unit -put str enc = putImpl str (show enc) - -main :: Effect Boolean +main :: Effect Unit main = do log "setDefaultEncoding should not affect writing" - _ <- testSetDefaultEncoding + testSetDefaultEncoding log "setEncoding should not affect reading" testSetEncoding log "test pipe" - _ <- testPipe + testPipe log "test write" testWrite @@ -51,55 +40,56 @@ main = do testString :: String testString = "üöß💡" -testReads :: Effect Boolean +testReads :: Effect Unit testReads = do - _ <- testReadString + testReadString testReadBuf where testReadString = do sIn <- passThrough - v <- readString sIn Nothing UTF8 + v <- readString sIn UTF8 assert (isNothing v) - onReadable sIn do - str <- readString sIn Nothing UTF8 + sIn # on_ readableH do + str <- readString sIn UTF8 assert (isJust str) assertEqual (unsafePartial (fromJust str)) testString pure unit - writeString sIn UTF8 testString \_ -> do + void $ writeString sIn UTF8 testString \_ -> do pure unit testReadBuf = do sIn <- passThrough - v <- read sIn Nothing + v <- read sIn assert (isNothing v) - onReadable sIn do - buf <- read sIn Nothing + sIn # on_ readableH do + buf <- read sIn assert (isJust buf) _ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf))) <*> pure testString pure unit - writeString sIn UTF8 testString \_ -> do + void $ writeString sIn UTF8 testString \_ -> do pure unit -testSetDefaultEncoding :: Effect Boolean +testSetDefaultEncoding :: Effect Unit testSetDefaultEncoding = do - w1 <- writableStreamBuffer - _ <- check w1 + w1 <- passThrough + check w1 - w2 <- writableStreamBuffer + w2 <- passThrough setDefaultEncoding w2 UCS2 check w2 where check w = do - writeString w UTF8 testString \_ -> do - c <- getContentsAsString w - assertEqual testString c + w # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf + assertEqual testString str + void $ writeString w UTF8 testString mempty testSetEncoding :: Effect Unit testSetEncoding = do @@ -108,19 +98,19 @@ testSetEncoding = do check UCS2 where check enc = do - r1 <- readableStreamBuffer - put testString enc r1 + r1 <- passThrough + void $ writeString r1 enc testString mempty - r2 <- readableStreamBuffer - put testString enc r2 + r2 <- passThrough + void $ writeString r1 enc testString mempty setEncoding r2 enc - onData r1 \buf -> unsafePartial do - onDataEither r2 \(Left str) -> do - _ <- assertEqual <$> Buffer.toString enc buf <*> pure testString + r1 # on_ dataH \buf -> do + r2 # on_ dataHStr \str -> do + join $ assertEqual <$> Buffer.toString enc buf <*> pure testString assertEqual str testString -testPipe :: Effect Boolean +testPipe :: Effect Unit testPipe = do sIn <- passThrough sOut <- passThrough @@ -134,9 +124,10 @@ testPipe = do log "pipe 3" _ <- unzip `pipe` sOut - writeString sIn UTF8 testString \_ -> do - end sIn \_ -> do - onDataString sOut UTF8 \str -> do + void $ writeString sIn UTF8 testString \_ -> do + void $ end sIn \_ -> do + sOut # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf assertEqual str testString foreign import createGzip :: Effect Duplex @@ -151,17 +142,17 @@ testWrite = do noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) - void $ end w1 $ const $ pure unit + w1 <- passThrough + w1 # on_ errorH (const $ pure unit) + end w1 mempty void $ writeString w1 UTF8 "msg" \err -> do assert' "writeString - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer + w1 <- passThrough void $ writeString w1 UTF8 "msg1" \err -> do assert' "writeString - should have no error" $ isNothing err - void $ end w1 (const $ pure unit) + void $ end w1 mempty testEnd :: Effect Unit testEnd = do @@ -169,14 +160,14 @@ testEnd = do noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) + w1 <- passThrough + w1 # on_ errorH (const $ pure unit) void $ writeString w1 UTF8 "msg" \_ -> do _ <- destroyWithError w1 $ error "Problem" - end w1 \err -> do + void $ end w1 \err -> do assert' "end - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer - end w1 \err -> do + w1 <- passThrough + void $ end w1 \err -> do assert' "end - should have no error" $ isNothing err From 1da1db92b9081bd776d07838991c521471483c28 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Thu, 6 Jul 2023 08:02:01 -0500 Subject: [PATCH 08/11] Drop unneeded test dep --- package.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/package.json b/package.json index 0672c6f..9521b8a 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,6 @@ "eslint": "^7.15.0", "pulp": "16.0.0-0", "purescript-psa": "^0.8.2", - "rimraf": "^3.0.2", - "stream-buffers": "^3.0.2" + "rimraf": "^3.0.2" } } From 3d689ec60243675043bee6ba4dd485ea95de48c9 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Thu, 6 Jul 2023 08:12:43 -0500 Subject: [PATCH 09/11] Reimplement readEither This was dropped in a previous commit --- src/Node/Stream.purs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index b41ae37..f047922 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -33,6 +33,8 @@ module Node.Stream , read' , readString , readString' + , readEither + , readEither' , write , writeString , cork @@ -170,7 +172,10 @@ read' r size = do foreign import readSizeImpl :: forall w. EffectFn2 (Readable w) (Int) (Nullable Chunk) +-- | Reads the stream to get a Buffer and converts that into a String +-- | with the given encoding. -- | Note: this will fail if `setEncoding` has been called on the stream. +-- | If that is the case, use `readEither` instead. readString :: forall w . Readable w @@ -184,7 +189,10 @@ readString r enc = do Just buf -> do Just <$> Buffer.toString enc buf +-- | Reads the given number of bytes from the stream to get a Buffer +-- | and converts that into a String with the given encoding. -- | Note: this will fail if `setEncoding` has been called on the stream. +-- | If that is the case, use `readEither'` instead. readString' :: forall w . Readable w @@ -199,6 +207,32 @@ readString' r size enc = do Just buf -> do Just <$> Buffer.toString enc buf +-- | Note: this will fail if `setEncoding` has been called on the stream. +readEither :: forall w. Readable w -> Effect (Maybe (Either String Buffer)) +readEither r = do + chunk <- runEffectFn1 readImpl r + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 (pure <<< Just <<< Right)) + (mkEffectFn1 (pure <<< Just <<< Left)) + c + +-- | Note: this will fail if `setEncoding` has been called on the stream. +readEither' :: forall w. Readable w -> Int -> Effect (Maybe (Either String Buffer)) +readEither' r size = do + chunk <- runEffectFn2 readSizeImpl r size + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 (pure <<< Just <<< Right)) + (mkEffectFn1 (pure <<< Just <<< Left)) + c + foreign import setEncodingImpl :: forall w . Readable w From 6c85c5c8da0c74d8a5fc0dc58f6086be151920cd Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Thu, 6 Jul 2023 08:14:03 -0500 Subject: [PATCH 10/11] Add changelog entry --- CHANGELOG.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb8c299..d09074d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,27 @@ Notable changes to this project are documented in this file. The format is based Breaking changes: - Update `node-buffer` to `v9.0.0` (#48 by @JordanMartinez) +- Reimplement event handlers using `eventNameH`-style API (#49 by @JordanMartinez) + + Previously, one would write something like the following, and be unable to remove + the resulting listener. + ```purs + Stream.onData stream \buffer -> do + ... + ``` + + Now, one writes such a thing via `on` (or a similar function) from `node-event-emitter`: + ```purs + -- if the listener should be removed later, use `on`. + removeListener <- stream # on dataH \buffer -> do + ... + -- if it doesn't need to be removed, use `on_`. + stream # on_ dataH \buffer -> do + ... + ``` New features: +- Added event handlers for `Writeable` streams (#49 by @JordanMartinez) Bugfixes: @@ -15,6 +34,7 @@ Other improvements: - Bumped CI's node version to `lts/*` (#48 by @JordanMartinez) - Updated CI `actions/checkout` and `actions/setup-nodee` to `v3` (#48 by @JordanMartinez) - Format code via purs-tidy; enforce formatting via CI (#48 by @JordanMartinez) +- Refactor tests using `passThrough` streams (#49 by @JordanMartinez) ## [v7.0.0](https://github.com/purescript-node/purescript-node-streams/releases/tag/v7.0.0) - 2022-04-29 From 15c79603c13309e5dea6c4ad4204731903e037a5 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Thu, 6 Jul 2023 08:19:33 -0500 Subject: [PATCH 11/11] Fix docs on readEither --- src/Node/Stream.purs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index f047922..0633644 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -207,7 +207,8 @@ readString' r size enc = do Just buf -> do Just <$> Buffer.toString enc buf --- | Note: this will fail if `setEncoding` has been called on the stream. +-- | Reads a chunk from the stream. This will work whether or not +-- | `setEncoding` has been called on the stream. readEither :: forall w. Readable w -> Effect (Maybe (Either String Buffer)) readEither r = do chunk <- runEffectFn1 readImpl r @@ -220,7 +221,8 @@ readEither r = do (mkEffectFn1 (pure <<< Just <<< Left)) c --- | Note: this will fail if `setEncoding` has been called on the stream. +-- | Reads the given number of bytes from the stream. This will work whether or not +-- | `setEncoding` has been called on the stream. readEither' :: forall w. Readable w -> Int -> Effect (Maybe (Either String Buffer)) readEither' r size = do chunk <- runEffectFn2 readSizeImpl r size