@@ -8,53 +8,58 @@ const assert = require('assert');
88const { setTimeout } = require ( 'timers/promises' ) ;
99
1010{
11- // forEach works on synchronous streams with a synchronous predicate
12- const stream = Readable . from ( [ 1 , 2 , 3 ] ) ;
13- const result = [ 1 , 2 , 3 ] ;
11+ // Filter works on synchronous streams with a synchronous predicate
12+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . filter ( ( x ) => x < 3 ) ;
13+ const result = [ 1 , 2 ] ;
1414 ( async ( ) => {
15- await stream . forEach ( ( value ) => assert . strictEqual ( value , result . shift ( ) ) ) ;
15+ for await ( const item of stream ) {
16+ assert . strictEqual ( item , result . shift ( ) ) ;
17+ }
1618 } ) ( ) . then ( common . mustCall ( ) ) ;
1719}
1820
1921{
20- // forEach works an asynchronous streams
21- const stream = Readable . from ( [ 1 , 2 , 3 ] ) . filter ( async ( x ) => {
22+ // Filter works on synchronous streams with an asynchronous predicate
23+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . filter ( async ( x ) => {
2224 await Promise . resolve ( ) ;
23- return true ;
25+ return x > 3 ;
2426 } ) ;
25- const result = [ 1 , 2 , 3 ] ;
27+ const result = [ 4 , 5 ] ;
2628 ( async ( ) => {
27- await stream . forEach ( ( value ) => assert . strictEqual ( value , result . shift ( ) ) ) ;
29+ for await ( const item of stream ) {
30+ assert . strictEqual ( item , result . shift ( ) ) ;
31+ }
2832 } ) ( ) . then ( common . mustCall ( ) ) ;
2933}
3034
3135{
32- // forEach works on asynchronous streams with a asynchronous forEach fn
33- const stream = Readable . from ( [ 1 , 2 , 3 ] ) . filter ( async ( x ) => {
36+ // Map works on asynchronous streams with a asynchronous mapper
37+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
3438 await Promise . resolve ( ) ;
35- return true ;
36- } ) ;
37- const result = [ 1 , 2 , 3 ] ;
39+ return x + x ;
40+ } ) . filter ( ( x ) => x > 5 ) ;
41+ const result = [ 6 , 8 , 10 ] ;
3842 ( async ( ) => {
39- await stream . forEach ( async ( value ) => {
40- await Promise . resolve ( ) ;
41- assert . strictEqual ( value , result . shift ( ) ) ;
42- } ) ;
43+ for await ( const item of stream ) {
44+ assert . strictEqual ( item , result . shift ( ) ) ;
45+ }
4346 } ) ( ) . then ( common . mustCall ( ) ) ;
4447}
4548
4649{
4750 // Concurrency + AbortSignal
4851 const ac = new AbortController ( ) ;
4952 let calls = 0 ;
50- const forEachPromise =
51- Readable . from ( [ 1 , 2 , 3 , 4 ] ) . forEach ( async ( _ , { signal } ) => {
52- calls ++ ;
53- await setTimeout ( 100 , { signal } ) ;
54- } , { signal : ac . signal , concurrency : 2 } ) ;
53+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 ] ) . filter ( async ( _ , { signal } ) => {
54+ calls ++ ;
55+ await setTimeout ( 100 , { signal } ) ;
56+ } , { signal : ac . signal , concurrency : 2 } ) ;
5557 // pump
5658 assert . rejects ( async ( ) => {
57- await forEachPromise ;
59+ for await ( const item of stream ) {
60+ // nope
61+ console . log ( item ) ;
62+ }
5863 } , {
5964 name : 'AbortError' ,
6065 } ) . then ( common . mustCall ( ) ) ;
@@ -65,22 +70,40 @@ const { setTimeout } = require('timers/promises');
6570 } ) ;
6671}
6772
73+ {
74+ // Concurrency result order
75+ const stream = Readable . from ( [ 1 , 2 ] ) . filter ( async ( item , { signal } ) => {
76+ await setTimeout ( 10 - item , { signal } ) ;
77+ return true ;
78+ } , { concurrency : 2 } ) ;
79+
80+ ( async ( ) => {
81+ const expected = [ 1 , 2 ] ;
82+ for await ( const item of stream ) {
83+ assert . strictEqual ( item , expected . shift ( ) ) ;
84+ }
85+ } ) ( ) . then ( common . mustCall ( ) ) ;
86+ }
87+
6888{
6989 // Error cases
7090 assert . rejects ( async ( ) => {
71- Readable . from ( [ 1 ] ) . forEach ( 1 ) ;
91+ // eslint-disable-next-line no-unused-vars
92+ for await ( const unused of Readable . from ( [ 1 ] ) . filter ( 1 ) ) ;
7293 } , / E R R _ I N V A L I D _ A R G _ T Y P E / ) . then ( common . mustCall ( ) ) ;
7394 assert . rejects ( async ( ) => {
74- Readable . from ( [ 1 ] ) . forEach ( ( x ) => x , {
95+ // eslint-disable-next-line no-unused-vars
96+ for await ( const _ of Readable . from ( [ 1 ] ) . filter ( ( x ) => x , {
7597 concurrency : 'Foo'
76- } ) ;
98+ } ) ) ;
7799 } , / E R R _ O U T _ O F _ R A N G E / ) . then ( common . mustCall ( ) ) ;
78100 assert . rejects ( async ( ) => {
79- Readable . from ( [ 1 ] ) . forEach ( ( x ) => x , 1 ) ;
101+ // eslint-disable-next-line no-unused-vars
102+ for await ( const _ of Readable . from ( [ 1 ] ) . filter ( ( x ) => x , 1 ) ) ;
80103 } , / E R R _ I N V A L I D _ A R G _ T Y P E / ) . then ( common . mustCall ( ) ) ;
81104}
82105{
83- // Test result is a Promise
84- const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . forEach ( ( _ ) => true ) ;
85- assert . strictEqual ( typeof stream . then , 'function' ) ;
86- }
106+ // Test result is a Readable
107+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . filter ( ( x ) => true ) ;
108+ assert . strictEqual ( stream . readable , true ) ;
109+ }
0 commit comments