1
1
'use strict'
2
2
3
3
const Block = require ( 'ipfs-block' )
4
- const pull = require ( 'pull-stream' )
5
4
const Lock = require ( 'lock' )
6
5
const base32 = require ( 'base32.js' )
7
6
const path = require ( 'path' )
8
- const pullWrite = require ( 'pull-write' )
9
7
const parallel = require ( 'run-parallel' )
8
+ const pull = require ( 'pull-stream' )
9
+ const pullWrite = require ( 'pull-write' )
10
10
const pullDefer = require ( 'pull-defer/source' )
11
11
12
12
const PREFIX_LENGTH = 5
13
+ const EXTENSION = 'data'
13
14
14
15
exports = module . exports
15
16
16
17
function multihashToPath ( multihash ) {
17
- const extension = 'data'
18
18
const encoder = new base32 . Encoder ( )
19
19
const hash = encoder . write ( multihash ) . finalize ( )
20
- const filename = `${ hash } .${ extension } `
20
+ const filename = `${ hash } .${ EXTENSION } `
21
21
const folder = filename . slice ( 0 , PREFIX_LENGTH )
22
22
23
23
return path . join ( folder , filename )
@@ -87,40 +87,31 @@ exports.setUp = (basePath, BlobStore, locks) => {
87
87
} ,
88
88
89
89
/*
90
- * returns a pull-stream to write blockBlob into
90
+ * putStream - write multiple blocks
91
+ *
92
+ * returns a pull-stream that expects blockBlobs
93
+ *
91
94
* NOTE: blockBlob is a { data: <>, key: <> } and not a
92
95
* ipfs-block instance. This is because Block instances support
93
96
* several types of hashing and it is up to the BlockService
94
97
* to understand the right one to use (given the CID)
95
98
*/
96
- // TODO use a more explicit name, given that getStream is just for
97
- // one block, multiple blocks should have different naming
99
+ // TODO
100
+ // consider using a more explicit name, this can cause some confusion
101
+ // since the natural association is
102
+ // getStream - createReadStream - read one
103
+ // putStream - createWriteStream - write one
104
+ // where in fact it is:
105
+ // getStream - createReadStream - read one (the same)
106
+ // putStream - createFilesWriteStream = write several
107
+ //
98
108
putStream ( ) {
99
109
let ended = false
100
110
let written = [ ]
101
111
let push = null
102
112
103
113
const sink = pullWrite ( ( blockBlobs , cb ) => {
104
- const tasks = blockBlobs . map ( ( blockBlob ) => {
105
- return ( cb ) => {
106
- writeBlock ( blockBlob , ( err , meta ) => {
107
- if ( err ) {
108
- return cb ( err )
109
- }
110
-
111
- if ( push ) {
112
- const read = push
113
- push = null
114
- read ( null , meta )
115
- return cb ( )
116
- }
117
-
118
- written . push ( meta )
119
- cb ( )
120
- } )
121
- }
122
- } )
123
-
114
+ const tasks = writeTasks ( blockBlobs )
124
115
parallel ( tasks , cb )
125
116
} , null , 100 , ( err ) => {
126
117
ended = err || true
@@ -129,7 +120,6 @@ exports.setUp = (basePath, BlobStore, locks) => {
129
120
}
130
121
} )
131
122
132
- // TODO ??Why does a putStream need to be a source as well??
133
123
const source = ( end , cb ) => {
134
124
if ( end ) {
135
125
ended = end
@@ -145,7 +135,36 @@ exports.setUp = (basePath, BlobStore, locks) => {
145
135
push = cb
146
136
}
147
137
148
- return { source : source , sink : sink }
138
+ /*
139
+ * Creates individual tasks to write each block blob that can be
140
+ * exectured in parallel
141
+ */
142
+ function writeTasks ( blockBlobs ) {
143
+ return blockBlobs . map ( ( blockBlob ) => {
144
+ return ( cb ) => {
145
+ writeBlock ( blockBlob , ( err , meta ) => {
146
+ if ( err ) {
147
+ return cb ( err )
148
+ }
149
+
150
+ if ( push ) {
151
+ const read = push
152
+ push = null
153
+ read ( null , meta )
154
+ return cb ( )
155
+ }
156
+
157
+ written . push ( meta )
158
+ cb ( )
159
+ } )
160
+ }
161
+ } )
162
+ }
163
+
164
+ return {
165
+ source : source ,
166
+ sink : sink
167
+ }
149
168
} ,
150
169
151
170
has ( key , callback ) {
0 commit comments