diff --git a/CHANGELOG.md b/CHANGELOG.md index 8604882..1ab09a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,8 +42,21 @@ Breaking changes: New features: - Added event handlers for `Writeable` streams (#49 by @JordanMartinez) +- Added missing APIs (#51 by @JordanMartinez) + + - readable, readableEnded, readableFlowing, readableHighWaterMark, readableLength + - pipe' + - writeable, writeableEnded, writeableCorked, errored, writeableFinished, writeableHighWaterMark, writeableLength, writeableNeedDrain + - closed, destroyed + - allowHalfOpen + - pipeline + - fromString, fromBuffer + - newPassThrough Bugfixes: +- Drop misleading comment for `setEncoding` (#51 by @JordanMartinez) + + For context, see #37. Other improvements: - Bumped CI's node version to `lts/*` (#48 by @JordanMartinez) @@ -51,6 +64,7 @@ Other improvements: - Format code via purs-tidy; enforce formatting via CI (#48 by @JordanMartinez) - Refactor tests using `passThrough` streams (#49 by @JordanMartinez) - Updated FFI to use uncurried functions (#50 by @JordanMartinez) +- Relocated `setEncoding`, `Read`, and `Write` for better locality in docs (#51 by @JordanMartinez) ## [v7.0.0](https://github.com/purescript-node/purescript-node-streams/releases/tag/v7.0.0) - 2022-04-29 diff --git a/src/Node/Stream.js b/src/Node/Stream.js index ce5b069..f5775b0 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -1,3 +1,5 @@ +import stream from "node:stream"; + export const setEncodingImpl = (s, enc) => s.setEncoding(enc); export const readChunkImpl = (useBuffer, useString, chunk) => { @@ -14,6 +16,16 @@ export const readChunkImpl = (useBuffer, useString, chunk) => { } }; +export const readableImpl = (r) => r.readable; + +export const readableEndedImpl = (r) => r.readableEnded; + +export const readableFlowingImpl = (r) => r.readableFlowing; + +export const readableHighWaterMarkImpl = (r) => r.readableHighWaterMark; + +export const readableLengthImpl = (r) => r.readableLength; + export const resumeImpl = (r) => r.resume(); export const pauseImpl = (r) => r.pause; @@ -22,6 +34,8 @@ export const isPausedImpl = (r) => r.isPaused; export const pipeImpl = (r, w) => r.pipe(w); +export const pipeCbImpl = (r, w, cb) => r.pipe(w, cb); + export const unpipeAllImpl = (r) => r.unpipe(); export const unpipeImpl = (r, w) => r.unpipe(w); @@ -48,6 +62,36 @@ export const endCbImpl = (w, cb) => w.end(cb); export const endImpl = (w) => w.end(); +export const writeableImpl = (w) => w.writeable; + +export const writeableEndedImpl = (w) => w.writeableEnded; + +export const writeableCorkedImpl = (w) => w.writeableCorked; + +export const erroredImpl = (w) => w.errored; + +export const writeableFinishedImpl = (w) => w.writeableFinished; + +export const writeableHighWaterMarkImpl = (w) => w.writeableHighWaterMark; + +export const writeableLengthImpl = (w) => w.writeableLength; + +export const writeableNeedDrainImpl = (w) => w.writeableNeedDrain; + export const destroyImpl = (w) => w.destroy(); export const destroyErrorImpl = (w, e) => w.destroy(e); + +export const closedImpl = (w) => w.closed; + +export const destroyedImpl = (w) => w.destroyed; + +export const allowHalfOpenImpl = (d) => d.allowHalfOpen; + +export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb); + +export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false }); + +export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false }); + +export const newPassThrough = () => new stream.PassThrough({ objectMode: false }); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index d78f36d..47268ea 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,14 +1,13 @@ --- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). +-- | This module provides a low-level wrapper for the [Node Stream API (v18 LTS)](https://nodejs.org/docs/latest-v18.x/api/stream.html). module Node.Stream - ( Stream - , Read - , Readable + ( Read , Write + , Stream + , Readable , Writable , Duplex , toEventEmitter - , setEncoding , closeH , errorH , drainH @@ -23,10 +22,16 @@ module Node.Stream , readableH , resumeH , endH + , readable + , readableEnded + , readableFlowing + , readableHighWaterMark + , readableLength , resume , pause , isPaused , pipe + , pipe' , unpipe , unpipeAll , read @@ -35,17 +40,33 @@ module Node.Stream , readString' , readEither , readEither' + , writeable + , writeableEnded + , writeableCorked + , errored + , writeableFinished + , writeableHighWaterMark + , writeableLength + , writeableNeedDrain , write , write' , writeString , writeString' , cork , uncork + , setEncoding , setDefaultEncoding , end , end' , destroy , destroy' + , closed + , destroyed + , allowHalfOpen + , pipeline + , fromString + , fromBuffer + , newPassThrough ) where import Prelude @@ -240,9 +261,6 @@ readEither' r size = do -- | Set the encoding used to read chunks as strings from the stream. This -- | function may be useful when you are passing a readable stream to some other -- | JavaScript library, which already expects an encoding to be set. --- | --- | Where possible, you should try to use `onDataString` instead of this --- | function. setEncoding :: forall w . Readable w @@ -282,6 +300,31 @@ resumeH = EventHandle "resume" identity endH :: forall w. EventHandle0 (Readable w) endH = EventHandle "end" identity +readable :: forall w. Readable w -> Effect Boolean +readable r = runEffectFn1 readableImpl r + +foreign import readableImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableEnded :: forall w. Readable w -> Effect Boolean +readableEnded r = runEffectFn1 readableEndedImpl r + +foreign import readableEndedImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableFlowing :: forall w. Readable w -> Effect Boolean +readableFlowing r = runEffectFn1 readableFlowingImpl r + +foreign import readableFlowingImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableHighWaterMark :: forall w. Readable w -> Effect Boolean +readableHighWaterMark r = runEffectFn1 readableHighWaterMarkImpl r + +foreign import readableHighWaterMarkImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableLength :: forall w. Readable w -> Effect Boolean +readableLength r = runEffectFn1 readableLengthImpl r + +foreign import readableLengthImpl :: forall w. EffectFn1 (Readable w) (Boolean) + -- | Resume reading from the stream. resume :: forall w. Readable w -> Effect Unit resume r = runEffectFn1 resumeImpl r @@ -306,6 +349,11 @@ pipe r w = runEffectFn2 pipeImpl r w foreign import pipeImpl :: forall w r. EffectFn2 (Readable w) (Writable r) (Unit) +pipe' :: forall w r. Readable w -> Writable r -> { end :: Boolean } -> Effect Unit +pipe' r w o = runEffectFn3 pipeCbImpl r w o + +foreign import pipeCbImpl :: forall w r. EffectFn3 (Readable w) (Writable r) ({ end :: Boolean }) (Unit) + -- | Detach a Writable stream previously attached using `pipe`. unpipe :: forall w r. Readable w -> Writable r -> Effect Unit unpipe r w = runEffectFn2 unpipeImpl r w @@ -318,6 +366,46 @@ unpipeAll r = runEffectFn1 unpipeAllImpl r foreign import unpipeAllImpl :: forall w. EffectFn1 (Readable w) (Unit) +writeable :: forall r. Writable r -> Effect Boolean +writeable w = runEffectFn1 writeableImpl w + +foreign import writeableImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableEnded :: forall r. Writable r -> Effect Boolean +writeableEnded w = runEffectFn1 writeableEndedImpl w + +foreign import writeableEndedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableCorked :: forall r. Writable r -> Effect Boolean +writeableCorked w = runEffectFn1 writeableCorkedImpl w + +foreign import writeableCorkedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +errored :: forall rw. Stream rw -> Effect Boolean +errored rw = runEffectFn1 erroredImpl rw + +foreign import erroredImpl :: forall rw. EffectFn1 (Stream rw) (Boolean) + +writeableFinished :: forall r. Writable r -> Effect Boolean +writeableFinished w = runEffectFn1 writeableFinishedImpl w + +foreign import writeableFinishedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableHighWaterMark :: forall r. Writable r -> Effect Number +writeableHighWaterMark w = runEffectFn1 writeableHighWaterMarkImpl w + +foreign import writeableHighWaterMarkImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableLength :: forall r. Writable r -> Effect Number +writeableLength w = runEffectFn1 writeableLengthImpl w + +foreign import writeableLengthImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableNeedDrain :: forall r. Writable r -> Effect Boolean +writeableNeedDrain w = runEffectFn1 writeableNeedDrainImpl w + +foreign import writeableNeedDrainImpl :: forall r. EffectFn1 (Writable r) (Boolean) + write :: forall r. Writable r -> Buffer -> Effect Boolean write w b = runEffectFn2 writeImpl w b @@ -385,3 +473,34 @@ destroy' w e = runEffectFn2 destroyErrorImpl w e foreign import destroyErrorImpl :: forall r. EffectFn2 (Stream r) (Error) Unit +closed :: forall r. Stream r -> Effect Boolean +closed w = runEffectFn1 closedImpl w + +foreign import closedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +destroyed :: forall r. Stream r -> Effect Boolean +destroyed w = runEffectFn1 destroyedImpl w + +foreign import destroyedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +allowHalfOpen :: Duplex -> Effect Boolean +allowHalfOpen d = runEffectFn1 allowHalfOpenImpl d + +foreign import allowHalfOpenImpl :: EffectFn1 (Duplex) (Boolean) + +pipeline :: forall w r. Readable w -> Array Duplex -> Writable r -> (Error -> Effect Unit) -> Effect Unit +pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest cb + +foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit) + +fromString :: String -> Effect (Readable ()) +fromString str = runEffectFn1 readableFromStrImpl str + +foreign import readableFromStrImpl :: EffectFn1 (String) (Readable ()) + +fromBuffer :: Buffer -> Effect (Readable ()) +fromBuffer buf = runEffectFn1 readableFromBufImpl buf + +foreign import readableFromBufImpl :: EffectFn1 (Buffer) (Readable ()) + +foreign import newPassThrough :: Effect Duplex diff --git a/test/Main.js b/test/Main.js index 5a46c5d..ed4e635 100644 --- a/test/Main.js +++ b/test/Main.js @@ -1,6 +1 @@ export { createGzip, createGunzip } from "zlib"; -import { PassThrough } from "stream"; - -export function passThrough() { - return new PassThrough; -} diff --git a/test/Main.purs b/test/Main.purs index ebd2c39..f9efcba 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -9,7 +9,7 @@ import Effect.Exception (error) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) import Node.EventEmitter (on_) -import Node.Stream (Duplex, dataH, dataHStr, destroy', end, end', errorH, pipe, read, readString, readableH, setDefaultEncoding, setEncoding, writeString, writeString') +import Node.Stream (Duplex, dataH, dataHStr, destroy', end, end', errorH, newPassThrough, pipe, read, readString, readableH, setDefaultEncoding, setEncoding, writeString, writeString') import Partial.Unsafe (unsafePartial) import Test.Assert (assert, assert') @@ -47,7 +47,7 @@ testReads = do where testReadString = do - sIn <- passThrough + sIn <- newPassThrough v <- readString sIn UTF8 assert (isNothing v) @@ -60,7 +60,7 @@ testReads = do void $ writeString sIn UTF8 testString testReadBuf = do - sIn <- passThrough + sIn <- newPassThrough v <- read sIn assert (isNothing v) @@ -75,10 +75,10 @@ testReads = do testSetDefaultEncoding :: Effect Unit testSetDefaultEncoding = do - w1 <- passThrough + w1 <- newPassThrough check w1 - w2 <- passThrough + w2 <- newPassThrough setDefaultEncoding w2 UCS2 check w2 @@ -96,10 +96,10 @@ testSetEncoding = do check UCS2 where check enc = do - r1 <- passThrough + r1 <- newPassThrough void $ writeString r1 enc testString - r2 <- passThrough + r2 <- newPassThrough void $ writeString r1 enc testString setEncoding r2 enc @@ -110,8 +110,8 @@ testSetEncoding = do testPipe :: Effect Unit testPipe = do - sIn <- passThrough - sOut <- passThrough + sIn <- newPassThrough + sOut <- newPassThrough zip <- createGzip unzip <- createGunzip @@ -131,23 +131,20 @@ testPipe = do foreign import createGzip :: Effect Duplex foreign import createGunzip :: Effect Duplex --- | Create a PassThrough stream, which simply writes its input to its output. -foreign import passThrough :: Effect Duplex - testWrite :: Effect Unit testWrite = do hasError noError where hasError = do - w1 <- passThrough + w1 <- newPassThrough w1 # on_ errorH (const $ pure unit) end w1 void $ writeString' w1 UTF8 "msg" \err -> do assert' "writeString - should have error" $ isJust err noError = do - w1 <- passThrough + w1 <- newPassThrough void $ writeString' w1 UTF8 "msg1" \err -> do assert' "writeString - should have no error" $ isNothing err end w1 @@ -158,7 +155,7 @@ testEnd = do noError where hasError = do - w1 <- passThrough + w1 <- newPassThrough w1 # on_ errorH (const $ pure unit) void $ writeString' w1 UTF8 "msg" \_ -> do _ <- destroy' w1 $ error "Problem" @@ -166,6 +163,6 @@ testEnd = do assert' "end - should have error" $ isJust err noError = do - w1 <- passThrough + w1 <- newPassThrough end' w1 \err -> do assert' "end - should have no error" $ isNothing err