From ee638dca06ccdafc0e4d87fe91120d7f80b79755 Mon Sep 17 00:00:00 2001 From: Dominik Auf der Maur Date: Fri, 20 Mar 2015 21:43:57 +0100 Subject: [PATCH 1/5] declared mongodb 2 driver incompatibility --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 3d95b7d2..dcc0c61c 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "cradle": ">=0.6.7", "expect.js": ">= 0.1.2", "mocha": ">= 1.0.1", - "mongodb": ">= 0.0.1", + "mongodb": ">= 0.0.1 < 2.0.0", "redis": ">= 0.10.1", "tingodb": ">= 0.0.1", "azure-storage": ">= 0.3.0" From 83b5a2a60d418474d546e0ea550fe7eaf0b59c68 Mon Sep 17 00:00:00 2001 From: Dominik Auf der Maur Date: Sat, 21 Mar 2015 02:01:24 +0100 Subject: [PATCH 2/5] mongoDB performance improvements - dropped streamId compatibility --- lib/databases/mongodb.js | 56 ++++++---------------------------------- 1 file changed, 8 insertions(+), 48 deletions(-) diff --git a/lib/databases/mongodb.js b/lib/databases/mongodb.js index d1c1fede..e2ab22d1 100644 --- a/lib/databases/mongodb.js +++ b/lib/databases/mongodb.js @@ -82,44 +82,13 @@ _.extend(Mongo.prototype, { self.client = client; self.events = new mongo.Collection(client, options.eventsCollectionName); - self.events.ensureIndex({ streamId: 1, aggregateId: 1, aggregate: 1, context: 1, commitId: 1, commitStamp: 1, commitSequence: 1, streamRevision: 1 }, + self.events.ensureIndex({ aggregateId: 1, streamRevision: 1 }, function (err) { if (err) { debug(err); } }); - - self.events.ensureIndex({ aggregateId: 1, aggregate: 1, context: 1 }, - function (err) { if (err) { debug(err); } }); - - self.events.ensureIndex({ aggregateId: 1, aggregate: 1 }, - function (err) { if (err) { debug(err); } }); - - self.events.ensureIndex({ aggregateId: 1 }, - function (err) { if (err) { debug(err); } }); - - self.events.ensureIndex({ streamId: 1 }, - function (err) { if (err) { debug(err); } }); - self.events.ensureIndex({ dispatched: 1 }, function (err) { if (err) { debug(err); } }); - self.events.ensureIndex({ commitStamp: 1, streamRevision: 1, commitSequence: 1 }, - function (err) { if (err) { debug(err); } }); - self.snapshots = new mongo.Collection(client, options.snapshotsCollectionName); - self.snapshots.ensureIndex({ streamId: 1, aggregateId: 1, aggregate: 1, context: 1, commitStamp: 1, revision: 1, version: 1 }, - function (err) { if (err) { debug(err); } }); - - self.snapshots.ensureIndex({ revision: -1, version: -1, commitStamp: -1 }, - function (err) { if (err) { debug(err); } }); - - self.snapshots.ensureIndex({ aggregateId: 1, aggregate: 1, context: 1 }, - function (err) { if (err) { debug(err); } }); - - self.snapshots.ensureIndex({ aggregateId: 1, aggregate: 1 }, - function (err) { if (err) { debug(err); } }); - - self.snapshots.ensureIndex({ aggregateId: 1 }, - function (err) { if (err) { debug(err); } }); - - self.snapshots.ensureIndex({ streamId: 1 }, + self.snapshots.ensureIndex({ aggregateId: 1, revision: -1 }, function (err) { if (err) { debug(err); } }); self.transactions = new mongo.Collection(client, options.transactionsCollectionName); @@ -241,10 +210,7 @@ _.extend(Mongo.prototype, { } if (query.aggregateId) { - findStatement['$or'] = [ - { aggregateId: query.aggregateId }, - { streamId: query.aggregateId } // just for compatability of < 1.0.0 - ]; + findStatement.aggregateId = query.aggregateId; } if (limit === -1) { @@ -262,17 +228,14 @@ _.extend(Mongo.prototype, { return; } - var options = { '$gte': revMin, '$lt': revMax }; + var streamRevOptions = { '$gte': revMin, '$lt': revMax }; if (revMax == -1) { - options = { '$gte': revMin }; + streamRevOptions = { '$gte': revMin }; } var findStatement = { - '$or': [ - { aggregateId: query.aggregateId }, - { streamId: query.aggregateId } // just for compatability of < 1.0.0 - ], - streamRevision: options + aggregateId: query.aggregateId, + streamRevision: streamRevOptions }; if (query.aggregate) { @@ -367,10 +330,7 @@ _.extend(Mongo.prototype, { } var findStatement = { - '$or': [ - { aggregateId: query.aggregateId }, - { streamId: query.aggregateId } // just for compatability of < 1.0.0 - ] + aggregateId: query.aggregateId }; if (query.aggregate) { From fd737bbed20cee719197ea28cb35d3c8ac8072ff Mon Sep 17 00:00:00 2001 From: Dominik Auf der Maur Date: Tue, 24 Mar 2015 09:15:25 +0100 Subject: [PATCH 3/5] improved mongodb handling of dispatched/undispatched flag --- lib/databases/mongodb.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/databases/mongodb.js b/lib/databases/mongodb.js index e2ab22d1..737b16de 100644 --- a/lib/databases/mongodb.js +++ b/lib/databases/mongodb.js @@ -84,7 +84,7 @@ _.extend(Mongo.prototype, { self.events = new mongo.Collection(client, options.eventsCollectionName); self.events.ensureIndex({ aggregateId: 1, streamRevision: 1 }, function (err) { if (err) { debug(err); } }); - self.events.ensureIndex({ dispatched: 1 }, + self.events.ensureIndex({ dispatched: 1 }, { sparse: true }, function (err) { if (err) { debug(err); } }); self.snapshots = new mongo.Collection(client, options.snapshotsCollectionName); @@ -305,7 +305,7 @@ _.extend(Mongo.prototype, { }, setEventToDispatched: function (id, callback) { - var updateCommand = { '$set' : { 'dispatched': true } }; + var updateCommand = { '$unset' : { 'dispatched': null } }; this.events.update({'_id' : id}, updateCommand, callback); }, From 50536446ebc3f0553d4760e7422ed74c629d7b36 Mon Sep 17 00:00:00 2001 From: Dominik Auf der Maur Date: Tue, 24 Mar 2015 12:58:55 +0100 Subject: [PATCH 4/5] improved inmemory DB handling of uncommitted events --- lib/databases/inmemory.js | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/lib/databases/inmemory.js b/lib/databases/inmemory.js index c982d4dc..e608517a 100644 --- a/lib/databases/inmemory.js +++ b/lib/databases/inmemory.js @@ -10,6 +10,7 @@ function InMemory(options) { Store.call(this, options); this.store = {}; this.snapshots = {}; + this.undispatchedEvents = {}; } util.inherits(InMemory, Store); @@ -56,6 +57,7 @@ _.extend(InMemory.prototype, { clear: function (callback) { this.store = {}; this.snapshots = {}; + this.undispatchedEvents = {}; if (callback) callback(null); }, @@ -87,6 +89,11 @@ _.extend(InMemory.prototype, { this.store[context][aggregate][aggregateId] = this.store[context][aggregate][aggregateId].concat(events); + var self = this; + _.forEach(events, function(evt) { + self.undispatchedEvents[evt.id] = evt; + }); + callback(null); }, @@ -215,31 +222,16 @@ _.extend(InMemory.prototype, { }, getUndispatchedEvents: function (callback) { - var res = []; - - this.getEvents({}, 0, -1, function (err, evts) { - for (var ele in evts) { - var evt = evts[ele]; - if (!evt.dispatched) { - res.push(evt); - } - } - callback(null, res); + var res = _.map(this.undispatchedEvents, function(value, key) { + return value; }); + + callback(null, res); }, setEventToDispatched: function (id, callback) { - var res; - - this.getEvents({ id: id }, 0, -1, function (err, evts) { - if (evts && evts.length > 0) { - res = evts[0]; - } - - res.dispatched = true; - - callback(null); - }); + delete this.undispatchedEvents[id]; + callback(null); }, addSnapshot: function(snap, callback) { From 3fec7b3c408af6853abb9438d1b97afc35f2ad2c Mon Sep 17 00:00:00 2001 From: Dominik Auf der Maur Date: Tue, 24 Mar 2015 15:24:47 +0100 Subject: [PATCH 5/5] some alignment of tingodb impl with mongodb impl --- lib/databases/tingodb.js | 45 +++++++--------------------------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/lib/databases/tingodb.js b/lib/databases/tingodb.js index a8b13917..59f40cf4 100644 --- a/lib/databases/tingodb.js +++ b/lib/databases/tingodb.js @@ -37,44 +37,13 @@ _.extend(Tingo.prototype, { // self.emit('disconnect'); // }); this.events = this.db.collection(options.eventsCollectionName + '.tingo'); - this.events.ensureIndex({ streamId: 1, aggregateId: 1, aggregate: 1, context: 1, commitId: 1, commitStamp: 1, commitSequence: 1, streamRevision: 1 }, + this.events.ensureIndex({ aggregateId: 1, streamRevision: 1 }, function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ aggregateId: 1, aggregate: 1, context: 1 }, - function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ aggregateId: 1, aggregate: 1 }, - function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ aggregateId: 1 }, - function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ streamId: 1 }, - function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ dispatched: 1 }, - function (err) { if (err) { debug(err); } }); - - this.events.ensureIndex({ commitStamp: 1, streamRevision: 1, commitSequence: 1 }, + this.events.ensureIndex({ dispatched: 1 }, { sparse: true }, function (err) { if (err) { debug(err); } }); this.snapshots = this.db.collection(options.snapshotsCollectionName + '.tingo'); - this.snapshots.ensureIndex({ streamId: 1, aggregateId: 1, aggregate: 1, context: 1, commitStamp: 1, revision: 1, version: 1 }, - function (err) { if (err) { debug(err); } }); - - this.snapshots.ensureIndex({ revision: -1, version: -1, commitStamp: -1 }, - function (err) { if (err) { debug(err); } }); - - this.snapshots.ensureIndex({ aggregateId: 1, aggregate: 1, context: 1 }, - function (err) { if (err) { debug(err); } }); - - this.snapshots.ensureIndex({ aggregateId: 1, aggregate: 1 }, - function (err) { if (err) { debug(err); } }); - - this.snapshots.ensureIndex({ aggregateId: 1 }, - function (err) { if (err) { debug(err); } }); - - this.snapshots.ensureIndex({ streamId: 1 }, + this.snapshots.ensureIndex({ aggregateId: 1, revision: -1 }, function (err) { if (err) { debug(err); } }); this.transactions = this.db.collection(options.transactionsCollectionName + '.tingo'); @@ -205,9 +174,9 @@ _.extend(Tingo.prototype, { return; } - var options = { '$gte': revMin, '$lt': revMax }; + var streamRevOptions = { '$gte': revMin, '$lt': revMax }; if (revMax == -1) { - options = { '$gte': revMin }; + streamRevOptions = { '$gte': revMin }; } var findStatement = { @@ -215,7 +184,7 @@ _.extend(Tingo.prototype, { { aggregateId: query.aggregateId }, { streamId: query.aggregateId } // just for compatability of < 1.0.0 ], - streamRevision: options + streamRevision: streamRevOptions }; if (query.aggregate) { @@ -285,7 +254,7 @@ _.extend(Tingo.prototype, { }, setEventToDispatched: function (id, callback) { - var updateCommand = { '$set' : { 'dispatched': true } }; + var updateCommand = { '$unset' : { 'dispatched': null } }; this.events.update({'_id' : id}, updateCommand, callback); },