diff --git a/src/pubsub.js b/src/pubsub.js index ced2b2c32..a762be4e1 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -120,6 +120,7 @@ module.exports = (arg) => { function subscribe (topic, options, handler, callback) { ps.on(topic, handler) + if (subscriptions[topic]) { return callback() } diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js index f994cc401..992529213 100644 --- a/src/utils/pubsub-message-stream.js +++ b/src/utils/pubsub-message-stream.js @@ -16,17 +16,18 @@ class PubsubMessageStream extends TransformStream { } _transform (obj, enc, callback) { - let msg - try { - msg = PubsubMessage.deserialize(obj, 'base64') - } catch (err) { - // Not a valid pubsub message - // go-ipfs returns '{}' as the very first object atm, we skip that + // go-ipfs returns '{}' as the very first object atm, we skip that + if (Object.keys(obj).length === 0) { return callback() } - this.push(msg) - callback() + try { + const msg = PubsubMessage.deserialize(obj, 'base64') + this.push(msg) + callback() + } catch (err) { + return callback(err) + } } } diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js index 2b3bbbabe..53d1e397a 100644 --- a/src/utils/pubsub-message-utils.js +++ b/src/utils/pubsub-message-utils.js @@ -30,10 +30,10 @@ function deserializeFromBase64 (obj) { from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(), seqno: Buffer.from(obj.seqno, 'base64'), data: Buffer.from(obj.data, 'base64'), - topicIDs: obj.topicIDs + topicIDs: obj.topicIDs || obj.topicCIDs } } function isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && obj.topicIDs + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) }