This repository was archived by the owner on Oct 19, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathstreams.ts
More file actions
139 lines (129 loc) · 4.29 KB
/
streams.ts
File metadata and controls
139 lines (129 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import {
StreamAction,
Feeds,
FeedItem
} from '../types'
import * as _ from 'highland'
/**
* The createStreams function creates Highland Streams from other objects such as Arrays and Promises.
*
* @private
* @param {Feeds<FeedItem>} feeds A Feeds object mapping names to stream-like objects.
* @return {Feeds<FeedItem>} A Feeds object mapping names to actual streams.
*/
function createStreams(feeds: any): Feeds<FeedItem> {
const streams: Feeds<FeedItem> = {}
Object.keys(feeds).forEach((key) => {
const feed = feeds[key]
if (_.isStream(feed)) {
streams[key] = <Highland.Stream<FeedItem>> feed
} else {
streams[key] = _(feed)
}
})
return streams
}
/**
* The createStreamMerged function simply merges all of the streams together, pushing values as
* they are received.
*
* @private
* @param {Feeds<FeedItem>} feeds A Feeds object containing multiple streams.
* @return {Highland.Stream<StreamAction>} A single Highland Stream made from merged streams.
*/
export function createStreamMerged(feeds: Feeds<FeedItem>): Highland.Stream<StreamAction> {
const streams: Feeds<FeedItem> = createStreams(feeds)
return _(Object.keys(streams)
.map(key => streams[key].map((item: FeedItem) => ({
type: key,
payload: item
})))).merge()
}
/**
* The createStreamSorted function is a little more advanced than its sibling createStreamMerged.
* This function pulls one item at a time from all of the streams and places each item in a Map.
* Then it compares items and pushes the one with the smallest timestamp. Finally, it pulls a new item
* and repats it all over again.
*
* @private
* @param {Feeds<FeedItem>} feeds A Feeds object containing multiple streams.
* @return {Highland.Stream<StreamAction>} A single Highland Stream made from sorted streams.
*/
export function createStreamSorted(feeds: Feeds<FeedItem>): Highland.Stream<StreamAction> {
const buffer = new Map()
const streams: Feeds<FeedItem> = createStreams(feeds)
const pred = (x1: any, x2: any) => {
const t1 = x1.payload && x1.payload.timestamp
const t2 = x2.payload && x2.payload.timestamp
if (typeof t1 === 'undefined' && typeof t2 === 'undefined') {
return false
}
if (typeof t2 === 'undefined') {
return true
}
if (typeof t1 === 'undefined') {
return false
}
return t1 < t2
}
// Highland typings does not like streams of streams, so we solve it using "as any"
const sources: Highland.Stream<Highland.Stream<StreamAction>> = _(Object.keys(streams)
.map(key => streams[key].map<StreamAction>((item: FeedItem) => ({
type: key,
payload: item
})))) as any
return sources.collect().flatMap((srcs: Highland.Stream<StreamAction>[]) => {
const nextValue = (src: Highland.Stream<StreamAction>, push: Function, next: Function) => {
src.pull((err, x) => {
if (err) {
push(err)
nextValue(src, push, next)
} else if (x === _.nil as any) {
// push last element in buffer
if (buffer.get(src)) {
push(null, buffer.get(src))
}
// must be final stream
if (buffer.size <= 1) {
push(null, _.nil)
} else {
// remove stream from map of streams and
// from array of source streams
buffer.delete(src)
srcs.splice(srcs.indexOf(src), 1)
next()
}
} else {
if (buffer.size === srcs.length) {
push(null, buffer.get(src))
}
// replace old buffer key/value with new one
buffer.set(src, x)
next()
}
})
}
if (!srcs.length) {
return _([])
}
let first = true
return _((push, next) => {
// need to buffer first element of all streams first before beginning
// comparisons
if (first) {
for (const src of srcs) {
nextValue(src, push, next)
}
first = false
}
let srcToPull
if (buffer.size === srcs.length) {
for (const pair of Array.from(buffer.entries())) {
srcToPull = srcToPull === undefined || pred(pair[1], srcToPull[1]) ? pair : srcToPull
}
// @ts-ignore
nextValue(srcToPull[0], push, next)
}
}) as Highland.Stream<StreamAction>
})
}