From e9d8ab0dd6f2112a49e87f59a9a895fb4e591f00 Mon Sep 17 00:00:00 2001
From: Richard Schneider <makaretu@gmail.com>
Date: Mon, 20 Nov 2017 20:05:30 +1300
Subject: [PATCH 1/3] fix: allow topicCIDs from older peers

---
 src/pubsub.js                     | 1 +
 src/utils/pubsub-message-utils.js | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/pubsub.js b/src/pubsub.js
index ced2b2c32..ec61daf61 100644
--- a/src/pubsub.js
+++ b/src/pubsub.js
@@ -121,6 +121,7 @@ module.exports = (arg) => {
   function subscribe (topic, options, handler, callback) {
     ps.on(topic, handler)
     if (subscriptions[topic]) {
+      // TODO: should a callback error be returned?
       return callback()
     }
 
diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js
index 2b3bbbabe..53d1e397a 100644
--- a/src/utils/pubsub-message-utils.js
+++ b/src/utils/pubsub-message-utils.js
@@ -30,10 +30,10 @@ function deserializeFromBase64 (obj) {
     from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(),
     seqno: Buffer.from(obj.seqno, 'base64'),
     data: Buffer.from(obj.data, 'base64'),
-    topicIDs: obj.topicIDs
+    topicIDs: obj.topicIDs || obj.topicCIDs
   }
 }
 
 function isPubsubMessage (obj) {
-  return obj && obj.from && obj.seqno && obj.data && obj.topicIDs
+  return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
 }

From a7edf7676b27cd9f2189b3c3272124d52807baf3 Mon Sep 17 00:00:00 2001
From: Richard Schneider <makaretu@gmail.com>
Date: Wed, 22 Nov 2017 11:38:03 +1300
Subject: [PATCH 2/3] fix: do not eat pubsub message error

---
 src/pubsub.js                      |  2 +-
 src/utils/pubsub-message-stream.js | 16 +++++++++-------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/src/pubsub.js b/src/pubsub.js
index ec61daf61..aaa3f18dd 100644
--- a/src/pubsub.js
+++ b/src/pubsub.js
@@ -120,8 +120,8 @@ module.exports = (arg) => {
 
   function subscribe (topic, options, handler, callback) {
     ps.on(topic, handler)
+    
     if (subscriptions[topic]) {
-      // TODO: should a callback error be returned?
       return callback()
     }
 
diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js
index f994cc401..ae335a12f 100644
--- a/src/utils/pubsub-message-stream.js
+++ b/src/utils/pubsub-message-stream.js
@@ -16,17 +16,19 @@ class PubsubMessageStream extends TransformStream {
   }
 
   _transform (obj, enc, callback) {
-    let msg
+    // go-ipfs returns '{}' as the very first object atm, we skip that
+    if (Object.keys(obj).length === 0) {
+      return callback()
+    }
+    
     try {
-      msg = PubsubMessage.deserialize(obj, 'base64')
+      const msg = PubsubMessage.deserialize(obj, 'base64')
+      this.push(msg)
+      callback()
     } catch (err) {
-      // Not a valid pubsub message
-      // go-ipfs returns '{}' as the very first object atm, we skip that
-      return callback()
+      return callback(err)
     }
 
-    this.push(msg)
-    callback()
   }
 }
 

From b99958d3147c5fd1889b80d24a988d5d76c5a37b Mon Sep 17 00:00:00 2001
From: Richard Schneider <makaretu@gmail.com>
Date: Wed, 22 Nov 2017 14:44:01 +1300
Subject: [PATCH 3/3] fix: lint errors

---
 src/pubsub.js                      | 2 +-
 src/utils/pubsub-message-stream.js | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/pubsub.js b/src/pubsub.js
index aaa3f18dd..a762be4e1 100644
--- a/src/pubsub.js
+++ b/src/pubsub.js
@@ -120,7 +120,7 @@ module.exports = (arg) => {
 
   function subscribe (topic, options, handler, callback) {
     ps.on(topic, handler)
-    
+
     if (subscriptions[topic]) {
       return callback()
     }
diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js
index ae335a12f..992529213 100644
--- a/src/utils/pubsub-message-stream.js
+++ b/src/utils/pubsub-message-stream.js
@@ -20,7 +20,7 @@ class PubsubMessageStream extends TransformStream {
     if (Object.keys(obj).length === 0) {
       return callback()
     }
-    
+
     try {
       const msg = PubsubMessage.deserialize(obj, 'base64')
       this.push(msg)
@@ -28,7 +28,6 @@ class PubsubMessageStream extends TransformStream {
     } catch (err) {
       return callback(err)
     }
-
   }
 }