Skip to content

Commit a9c48e4

Browse files
fix(datastore): keep locks on writes
1 parent a8ce3a6 commit a9c48e4

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"bl": "^1.1.2",
5050
"concat-stream": "^1.5.1",
5151
"level-js": "^2.2.3",
52+
"lock": "^0.1.2",
5253
"lockfile": "^1.0.1",
5354
"multihashes": "^0.2.1",
5455
"xtend": "^4.0.1"
@@ -63,4 +64,4 @@
6364
"dignifiedquire <[email protected]>",
6465
"greenkeeperio-bot <[email protected]>"
6566
]
66-
}
67+
}

src/stores/datastore.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
'use strict'
22

3+
const Lock = require('lock')
4+
const stream = require('stream')
5+
36
const PREFIX_LENGTH = 8
47

58
exports = module.exports
@@ -15,6 +18,7 @@ function multihashToPath (multihash, extension) {
1518

1619
exports.setUp = (basePath, blobStore, locks) => {
1720
const store = blobStore(basePath + '/blocks')
21+
const lock = new Lock()
1822

1923
return {
2024
createReadStream: (multihash, extension) => {
@@ -29,8 +33,16 @@ exports.setUp = (basePath, blobStore, locks) => {
2933
}
3034

3135
const path = multihashToPath(multihash, extension)
32-
return store.createWriteStream(path, cb)
36+
const through = stream.PassThrough()
37+
38+
lock(path, (release) => {
39+
const ws = store.createWriteStream(path, release(cb))
40+
through.pipe(ws)
41+
})
42+
43+
return through
3344
},
45+
3446
exists: (multihash, extension, cb) => {
3547
if (typeof extension === 'function') {
3648
cb = extension
@@ -40,6 +52,7 @@ exports.setUp = (basePath, blobStore, locks) => {
4052
const path = multihashToPath(multihash, extension)
4153
return store.exists(path, cb)
4254
},
55+
4356
remove: (multihash, extension, cb) => {
4457
if (typeof extension === 'function') {
4558
cb = extension

test/repo-test.js

+24
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,30 @@ module.exports = function (repo) {
185185
}).end(data)
186186
})
187187

188+
it('write locks', (done) => {
189+
const rnd = 'QmVtU7ths96fMgZ8YSZAbKghyieq7AjxNdcqyVtesthash'
190+
const mh = new Buffer(base58.decode(rnd))
191+
const data = new Buffer('Oh the data')
192+
193+
let i = 0
194+
const finish = () => {
195+
i++
196+
if (i === 2) done()
197+
}
198+
199+
repo.datastore.createWriteStream(mh, (err, metadata) => {
200+
expect(err).to.not.exist
201+
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
202+
finish()
203+
}).end(data)
204+
205+
repo.datastore.createWriteStream(mh, (err, metadata) => {
206+
expect(err).to.not.exist
207+
expect(metadata.key).to.equal('12207028/122070286b9afa6620a66f715c7020d68af3d10e1a497971629c07605f55537ce990.data')
208+
finish()
209+
}).end(data)
210+
})
211+
188212
it('block exists', function (done) {
189213
const buf = new Buffer(base58.decode(baseFileHash))
190214

0 commit comments

Comments
 (0)