Skip to content

Commit 0e2eba8

Browse files
committed
refactor: cb => async
1 parent fe3796b commit 0e2eba8

29 files changed

+1386
-1592
lines changed

src/decision-engine/index.js

Lines changed: 150 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
'use strict'
22

3-
const each = require('async/each')
4-
const eachSeries = require('async/eachSeries')
5-
const waterfall = require('async/waterfall')
6-
const nextTick = require('async/nextTick')
7-
8-
const map = require('async/map')
93
const debounce = require('just-debounce-it')
104

115
const Message = require('../types/message')
@@ -32,21 +26,46 @@ class DecisionEngine {
3226
this._outbox = debounce(this._processTasks.bind(this), 100)
3327
}
3428

35-
_sendBlocks (peer, blocks, cb) {
36-
// split into messges of max 512 * 1024 bytes
29+
// _sendBlocks (peer, blocks, cb) {
30+
async _sendBlocks (peer, blocks) {
31+
// split into messages of max 512 * 1024 bytes
3732
const total = blocks.reduce((acc, b) => {
3833
return acc + b.data.byteLength
3934
}, 0)
4035

4136
if (total < MAX_MESSAGE_SIZE) {
42-
return this._sendSafeBlocks(peer, blocks, cb)
37+
await this._sendSafeBlocks(peer, blocks)
38+
return
4339
}
4440

4541
let size = 0
4642
let batch = []
4743
let outstanding = blocks.length
4844

49-
eachSeries(blocks, (b, cb) => {
45+
// eachSeries(blocks, (b, cb) => {
46+
// outstanding--
47+
// batch.push(b)
48+
// size += b.data.byteLength
49+
50+
// if (size >= MAX_MESSAGE_SIZE ||
51+
// // need to ensure the last remaining items get sent
52+
// outstanding === 0) {
53+
// size = 0
54+
// const nextBatch = batch.slice()
55+
// batch = []
56+
// this._sendSafeBlocks(peer, nextBatch, (err) => {
57+
// if (err) {
58+
// this._log('sendblock error: %s', err.message)
59+
// }
60+
// // not returning the error, so we send as much as we can
61+
// // as otherwise `eachSeries` would cancel
62+
// cb()
63+
// })
64+
// } else {
65+
// nextTick(cb)
66+
// }
67+
// }, cb)
68+
for (const b of blocks) {
5069
outstanding--
5170
batch.push(b)
5271
size += b.data.byteLength
@@ -57,28 +76,25 @@ class DecisionEngine {
5776
size = 0
5877
const nextBatch = batch.slice()
5978
batch = []
60-
this._sendSafeBlocks(peer, nextBatch, (err) => {
61-
if (err) {
62-
this._log('sendblock error: %s', err.message)
63-
}
64-
// not returning the error, so we send as much as we can
65-
// as otherwise `eachSeries` would cancel
66-
cb()
67-
})
68-
} else {
69-
nextTick(cb)
79+
try {
80+
await this._sendSafeBlocks(peer, nextBatch)
81+
} catch (err) {
82+
// catch the error so as to send as many blocks as we can
83+
this._log('sendblock error: %s', err.message)
84+
}
7085
}
71-
}, cb)
86+
}
7287
}
7388

74-
_sendSafeBlocks (peer, blocks, cb) {
89+
// _sendSafeBlocks (peer, blocks, cb) {
90+
async _sendSafeBlocks (peer, blocks) {
7591
const msg = new Message(false)
7692
blocks.forEach((b) => msg.addBlock(b))
7793

78-
this.network.sendMessage(peer, msg, cb)
94+
await this.network.sendMessage(peer, msg)
7995
}
8096

81-
_processTasks () {
97+
async _processTasks () {
8298
if (!this._running || !this._tasks.length) {
8399
return
84100
}
@@ -90,35 +106,55 @@ class DecisionEngine {
90106
const uniqCids = uniqWith((a, b) => a.equals(b), cids)
91107
const groupedTasks = groupBy(task => task.target.toB58String(), tasks)
92108

93-
waterfall([
94-
(callback) => map(uniqCids, (cid, cb) => {
95-
this.blockstore.get(cid, cb)
96-
}, callback),
97-
(blocks, callback) => each(Object.values(groupedTasks), (tasks, cb) => {
98-
// all tasks have the same target
99-
const peer = tasks[0].target
100-
const blockList = cids.map((cid) => {
101-
return blocks.find(b => b.cid.equals(cid))
102-
})
103-
104-
this._sendBlocks(peer, blockList, (err) => {
105-
if (err) {
106-
// `_sendBlocks` actually doesn't return any errors
107-
this._log.error('should never happen: ', err)
108-
} else {
109-
blockList.forEach((block) => this.messageSent(peer, block))
110-
}
111-
112-
cb()
113-
})
114-
}, callback)
115-
], (err) => {
116-
this._tasks = []
117-
118-
if (err) {
119-
this._log.error(err)
109+
// waterfall([
110+
// (callback) => map(uniqCids, (cid, cb) => {
111+
// this.blockstore.get(cid, cb)
112+
// }, callback),
113+
// (blocks, callback) => each(Object.values(groupedTasks), (tasks, cb) => {
114+
// // all tasks have the same target
115+
// const peer = tasks[0].target
116+
// const blockList = cids.map((cid) => {
117+
// return blocks.find(b => b.cid.equals(cid))
118+
// })
119+
120+
// this._sendBlocks(peer, blockList, (err) => {
121+
// if (err) {
122+
// // `_sendBlocks` actually doesn't return any errors
123+
// this._log.error('should never happen: ', err)
124+
// } else {
125+
// blockList.forEach((block) => this.messageSent(peer, block))
126+
// }
127+
128+
// cb()
129+
// })
130+
// }, callback)
131+
// ], (err) => {
132+
// this._tasks = []
133+
134+
// if (err) {
135+
// this._log.error(err)
136+
// }
137+
// })
138+
139+
const blocks = await Promise.all(uniqCids.map(cid => this.blockstore.get(cid)))
140+
await Object.values(groupedTasks).map(async (tasks) => {
141+
// all tasks in the group have the same target
142+
const peer = tasks[0].target
143+
const blockList = cids.map((cid) => blocks.find(b => b.cid.equals(cid)))
144+
145+
try {
146+
await this._sendBlocks(peer, blockList)
147+
} catch (err) {
148+
// `_sendBlocks` actually doesn't return any errors
149+
this._log.error('should never happen: ', err)
150+
return
151+
}
152+
for (const block of blockList) {
153+
this.messageSent(peer, block)
120154
}
121155
})
156+
157+
this._tasks = []
122158
}
123159

124160
wantlistForPeer (peerId) {
@@ -170,11 +206,29 @@ class DecisionEngine {
170206
}
171207

172208
// Handle incoming messages
173-
messageReceived (peerId, msg, cb) {
209+
// messageReceived (peerId, msg, cb) {
210+
async messageReceived (peerId, msg) {
211+
// const ledger = this._findOrCreate(peerId)
212+
213+
// if (msg.empty) {
214+
// return nextTick(cb)
215+
// }
216+
217+
// // If the message was a full wantlist clear the current one
218+
// if (msg.full) {
219+
// ledger.wantlist = new Wantlist()
220+
// }
221+
222+
// this._processBlocks(msg.blocks, ledger)
223+
224+
// if (msg.wantlist.size === 0) {
225+
// return nextTick(cb)
226+
// }
227+
174228
const ledger = this._findOrCreate(peerId)
175229

176230
if (msg.empty) {
177-
return nextTick(cb)
231+
return
178232
}
179233

180234
// If the message was a full wantlist clear the current one
@@ -185,7 +239,7 @@ class DecisionEngine {
185239
this._processBlocks(msg.blocks, ledger)
186240

187241
if (msg.wantlist.size === 0) {
188-
return nextTick(cb)
242+
return
189243
}
190244

191245
let cancels = []
@@ -201,7 +255,7 @@ class DecisionEngine {
201255
})
202256

203257
this._cancelWants(ledger, peerId, cancels)
204-
this._addWants(ledger, peerId, wants, cb)
258+
await this._addWants(ledger, peerId, wants)
205259
}
206260

207261
_cancelWants (ledger, peerId, entries) {
@@ -214,27 +268,48 @@ class DecisionEngine {
214268
}, this._tasks, entries)
215269
}
216270

217-
_addWants (ledger, peerId, entries, callback) {
218-
each(entries, (entry, cb) => {
271+
// _addWants (ledger, peerId, entries, callback) {
272+
async _addWants (ledger, peerId, entries) {
273+
// each(entries, (entry, cb) => {
274+
// // If we already have the block, serve it
275+
// this.blockstore.has(entry.cid, (err, exists) => {
276+
// if (err) {
277+
// this._log.error('failed existence check')
278+
// } else if (exists) {
279+
// this._tasks.push({
280+
// entry: entry.entry,
281+
// target: peerId
282+
// })
283+
// }
284+
// cb()
285+
// })
286+
// }, () => {
287+
// this._outbox()
288+
// callback()
289+
// })
290+
291+
await Promise.all(entries.map(async (entry) => {
219292
// If we already have the block, serve it
220-
this.blockstore.has(entry.cid, (err, exists) => {
221-
if (err) {
222-
this._log.error('failed existence check')
223-
} else if (exists) {
224-
this._tasks.push({
225-
entry: entry.entry,
226-
target: peerId
227-
})
228-
}
229-
cb()
230-
})
231-
}, () => {
232-
this._outbox()
233-
callback()
234-
})
293+
let exists
294+
try {
295+
exists = await this.blockstore.has(entry.cid)
296+
} catch (err) {
297+
this._log.error('failed blockstore existence check for ' + entry.cid)
298+
return
299+
}
300+
301+
if (exists) {
302+
this._tasks.push({
303+
entry: entry.entry,
304+
target: peerId
305+
})
306+
}
307+
}))
308+
309+
this._outbox()
235310
}
236311

237-
_processBlocks (blocks, ledger, callback) {
312+
_processBlocks (blocks, ledger) {
238313
const cids = []
239314
blocks.forEach((b, cidStr) => {
240315
this._log('got block (%s bytes)', b.data.length)
@@ -287,14 +362,12 @@ class DecisionEngine {
287362
return l
288363
}
289364

290-
start (callback) {
365+
start () {
291366
this._running = true
292-
nextTick(() => callback())
293367
}
294368

295-
stop (callback) {
369+
stop () {
296370
this._running = false
297-
nextTick(() => callback())
298371
}
299372
}
300373

0 commit comments

Comments
 (0)