From ff6c51b34c29bfb9bcd64a623db0b4710b081ce0 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 6 Jan 2017 14:08:34 +0100 Subject: [PATCH 1/5] implement allRooms method --- index.js | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ test/index.js | 16 +++++++++++ 2 files changed, 93 insertions(+) diff --git a/index.js b/index.js index 6b9887e..16aa476 100644 --- a/index.js +++ b/index.js @@ -23,6 +23,10 @@ module.exports = adapter; var requestTypes = { clients: 0, clientRooms: 1, + allRooms: 2, + remoteJoin: 3, + remoteLeave: 4, + customRequest: 5, }; /** @@ -212,6 +216,16 @@ function adapter(uri, opts) { }); break; + case requestTypes.allRooms: + + var response = JSON.stringify({ + requestid: request.requestid, + rooms: Object.keys(this.rooms) + }); + + pub.publish(self.responseChannel, response); + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -268,6 +282,23 @@ function adapter(uri, opts) { delete self.requests[request.requestid]; break; + case requestTypes.allRooms: + request.msgCount++; + + // ignore if response does not contain 'rooms' key + if(!response.rooms || !Array.isArray(response.rooms)) return; + + for(var i = 0; i < response.rooms.length; i++){ + request.rooms[response.rooms[i]] = true; + } + + if (request.msgCount === request.numsub) { + clearTimeout(request.timeout); + if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.rooms))); + delete self.requests[request.requestid]; + } + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -489,6 +520,52 @@ function adapter(uri, opts) { pub.publish(self.requestChannel, request); }; + /** + * Gets the list of all rooms (accross every node) + * + * @param {Function} callback + * @api public + */ + + Redis.prototype.allRooms = function(fn){ + + var self = this; + var requestid = uid2(6); + + pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){ + if (err) { + self.emit('error', err); + if (fn) fn(err); + return; + } + + numsub = parseInt(numsub[1], 10); + + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.allRooms + }); + + // if there is no response for x second, return result + var timeout = setTimeout(function() { + var request = self.requests[requestid]; + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for allRooms response'), Object.keys(request.rooms))); + delete self.requests[requestid]; + }, self.requestsTimeout); + + self.requests[requestid] = { + type: requestTypes.allRooms, + numsub: numsub, + msgCount: 0, + rooms: {}, + callback: fn, + timeout: timeout + }; + + pub.publish(self.requestChannel, request); + }); + }; + Redis.uid = uid; Redis.pubClient = pub; Redis.subClient = sub; diff --git a/test/index.js b/test/index.js index 12b8453..b02d693 100644 --- a/test/index.js +++ b/test/index.js @@ -194,6 +194,22 @@ var socket1, socket2, socket3; }); }); }); + + describe('requests', function(){ + + it('returns all rooms accross several nodes', function(done){ + socket1.join('woot1', function () { + namespace1.adapter.allRooms(function(err, rooms){ + expect(rooms).to.have.length(4); + expect(rooms).to.contain(socket1.id); + expect(rooms).to.contain(socket2.id); + expect(rooms).to.contain(socket3.id); + expect(rooms).to.contain('woot1'); + done(); + }); + }); + }); + }); }); }); From 5b0083cbbcaa721c2bb76b7dd61856025a93cae8 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 6 Jan 2017 14:16:40 +0100 Subject: [PATCH 2/5] implement remoteJoin method --- index.js | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++ test/index.js | 9 +++++++ 2 files changed, 74 insertions(+) diff --git a/index.js b/index.js index 16aa476..737a732 100644 --- a/index.js +++ b/index.js @@ -226,6 +226,22 @@ function adapter(uri, opts) { pub.publish(self.responseChannel, response); break; + case requestTypes.remoteJoin: + + var socket = this.nsp.connected[request.sid]; + if (!socket) { return; } + + function sendAck(){ + var response = JSON.stringify({ + requestid: request.requestid + }); + + pub.publish(self.responseChannel, response); + } + + socket.join(request.room, sendAck); + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -299,6 +315,12 @@ function adapter(uri, opts) { } break; + case requestTypes.remoteJoin: + clearTimeout(request.timeout); + if (request.callback) process.nextTick(request.callback.bind(null, null)); + delete self.requests[request.requestid]; + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -566,6 +588,49 @@ function adapter(uri, opts) { }); }; + /** + * Makes the socket with the given id join the room + * + * @param {String} socket id + * @param {String} room name + * @param {Function} callback + * @api public + */ + + Redis.prototype.remoteJoin = function(id, room, fn){ + + var self = this; + var requestid = uid2(6); + + var socket = this.nsp.connected[id]; + if (socket) { + socket.join(room); + if (fn) process.nextTick(fn.bind(null, null)); + return; + } + + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.remoteJoin, + sid: id, + room: room + }); + + // if there is no response for x second, return result + var timeout = setTimeout(function() { + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteJoin response'))); + delete self.requests[requestid]; + }, self.requestsTimeout); + + self.requests[requestid] = { + type: requestTypes.remoteJoin, + callback: fn, + timeout: timeout + }; + + pub.publish(self.requestChannel, request); + }; + Redis.uid = uid; Redis.pubClient = pub; Redis.subClient = sub; diff --git a/test/index.js b/test/index.js index b02d693..34cf0d4 100644 --- a/test/index.js +++ b/test/index.js @@ -209,6 +209,15 @@ var socket1, socket2, socket3; }); }); }); + + it('makes a given socket join a room', function(done){ + namespace3.adapter.remoteJoin(socket1.id, 'woot3', function(err){ + var rooms = Object.keys(socket1.rooms); + expect(rooms).to.have.length(2); + expect(rooms).to.contain('woot3'); + done(); + }); + }); }); }); }); From 3125aa87b339b87e79de78789af8916236be8ff8 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 6 Jan 2017 14:21:36 +0100 Subject: [PATCH 3/5] implement remoteLeave method --- index.js | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++ test/index.js | 11 ++++++++++ 2 files changed, 71 insertions(+) diff --git a/index.js b/index.js index 737a732..22cb31c 100644 --- a/index.js +++ b/index.js @@ -242,6 +242,22 @@ function adapter(uri, opts) { socket.join(request.room, sendAck); break; + case requestTypes.remoteLeave: + + var socket = this.nsp.connected[request.sid]; + if (!socket) { return; } + + function sendAck(){ + var response = JSON.stringify({ + requestid: request.requestid + }); + + pub.publish(self.responseChannel, response); + } + + socket.leave(request.room, sendAck); + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -316,6 +332,7 @@ function adapter(uri, opts) { break; case requestTypes.remoteJoin: + case requestTypes.remoteLeave: clearTimeout(request.timeout); if (request.callback) process.nextTick(request.callback.bind(null, null)); delete self.requests[request.requestid]; @@ -631,6 +648,49 @@ function adapter(uri, opts) { pub.publish(self.requestChannel, request); }; + /** + * Makes the socket with the given id leave the room + * + * @param {String} socket id + * @param {String} room name + * @param {Function} callback + * @api public + */ + + Redis.prototype.remoteLeave = function(id, room, fn){ + + var self = this; + var requestid = uid2(6); + + var socket = this.nsp.connected[id]; + if (socket) { + socket.leave(room); + if (fn) process.nextTick(fn.bind(null, null)); + return; + } + + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.remoteLeave, + sid: id, + room: room + }); + + // if there is no response for x second, return result + var timeout = setTimeout(function() { + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteLeave response'))); + delete self.requests[requestid]; + }, self.requestsTimeout); + + self.requests[requestid] = { + type: requestTypes.remoteLeave, + callback: fn, + timeout: timeout + }; + + pub.publish(self.requestChannel, request); + }; + Redis.uid = uid; Redis.pubClient = pub; Redis.subClient = sub; diff --git a/test/index.js b/test/index.js index 34cf0d4..d99b7b3 100644 --- a/test/index.js +++ b/test/index.js @@ -218,6 +218,17 @@ var socket1, socket2, socket3; done(); }); }); + + it('makes a given socket leave a room', function(done){ + socket1.join('woot3', function(){ + namespace3.adapter.remoteLeave(socket1.id, 'woot3', function(err){ + var rooms = Object.keys(socket1.rooms); + expect(rooms).to.have.length(1); + expect(rooms).not.to.contain('woot3'); + done(); + }); + }); + }); }); }); }); From 5636ecd207563624cd16f5d65e8a24372f44d2d9 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 6 Jan 2017 15:46:00 +0100 Subject: [PATCH 4/5] implement custom request --- index.js | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++ test/index.js | 16 +++++++++++ 2 files changed, 92 insertions(+) diff --git a/index.js b/index.js index 22cb31c..37148c8 100644 --- a/index.js +++ b/index.js @@ -90,6 +90,7 @@ function adapter(uri, opts) { this.requestChannel = prefix + '-request#' + this.nsp.name + '#'; this.responseChannel = prefix + '-response#' + this.nsp.name + '#'; this.requests = {}; + this.customHook = function(){ return null; } if (String.prototype.startsWith) { this.channelMatches = function (messageChannel, subscribedChannel) { @@ -258,6 +259,17 @@ function adapter(uri, opts) { socket.leave(request.room, sendAck); break; + case requestTypes.customRequest: + var data = this.customHook(request.data); + + var response = JSON.stringify({ + requestid: request.requestid, + data: data + }); + + pub.publish(self.responseChannel, response); + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -338,6 +350,18 @@ function adapter(uri, opts) { delete self.requests[request.requestid]; break; + case requestTypes.customRequest: + request.msgCount++; + + request.replies.push(response.data); + + if (request.msgCount === request.numsub) { + clearTimeout(request.timeout); + if (request.callback) process.nextTick(request.callback.bind(null, null, request.replies)); + delete self.requests[request.requestid]; + } + break; + default: debug('ignoring unknown request type: %s', request.type); } @@ -691,6 +715,58 @@ function adapter(uri, opts) { pub.publish(self.requestChannel, request); }; + /** + * Sends a new custom request to other nodes + * + * @param {Object} data (no binary) + * @param {Function} callback + * @api public + */ + + Redis.prototype.customRequest = function(data, fn){ + if (typeof data === 'function'){ + fn = data; + data = null; + } + + var self = this; + var requestid = uid2(6); + + pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){ + if (err) { + self.emit('error', err); + if (fn) fn(err); + return; + } + + numsub = parseInt(numsub[1], 10); + + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.customRequest, + data: data + }); + + // if there is no response for x second, return result + var timeout = setTimeout(function() { + var request = self.requests[requestid]; + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for customRequest response'), request.replies)); + delete self.requests[requestid]; + }, self.requestsTimeout); + + self.requests[requestid] = { + type: requestTypes.customRequest, + numsub: numsub, + msgCount: 0, + replies: [], + callback: fn, + timeout: timeout + }; + + pub.publish(self.requestChannel, request); + }); + }; + Redis.uid = uid; Redis.pubClient = pub; Redis.subClient = sub; diff --git a/test/index.js b/test/index.js index d99b7b3..a43fb47 100644 --- a/test/index.js +++ b/test/index.js @@ -229,6 +229,19 @@ var socket1, socket2, socket3; }); }); }); + + it('sends a custom request', function(done){ + namespace1.adapter.customHook = function myCustomHook(data){ + expect(data).to.be('hello'); + return this.uid; + } + + namespace3.adapter.customRequest('hello', function(err, replies){ + expect(replies).to.have.length(3); + expect(replies).to.contain(namespace1.adapter.uid); + done(); + }); + }); }); }); }); @@ -286,5 +299,8 @@ function cleanup(done){ namespace1.server.close(); namespace2.server.close(); namespace3.server.close(); + namespace1.adapter.subClient.end(false); + namespace2.adapter.subClient.end(false); + namespace3.adapter.subClient.end(false); done(); } From 27ceb025d44258ea945ecb012e7054b55ad65e20 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 6 Jan 2017 16:23:47 +0100 Subject: [PATCH 5/5] update README --- README.md | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/README.md b/README.md index df45002..3a890c8 100644 --- a/README.md +++ b/README.md @@ -57,10 +57,75 @@ that a regular `Adapter` does not Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction) +```js +io.adapter.clients(function (err, clients) { + console.log(clients); // an array containing all connected socket ids +}); + +io.adapter.clients(['room1', 'room2'], function (err, clients) { + console.log(clients); // an array containing socket ids in 'room1' and/or 'room2' +}); +``` + ### RedisAdapter#clientRooms(id:String, fn:Function) Returns the list of rooms the client with the given ID has joined (even on another node). +```js +io.adapter.clientRooms('', function (err, rooms) { + if (err) { /* unknown id */ } + console.log(rooms); // an array containing every room a given id has joined. +}); +``` + +### RedisAdapter#allRooms(fn:Function) + +Returns the list of all rooms. + +```js +io.adapter.allRooms(function (err, rooms) { + console.log(rooms); // an array containing all rooms (accross every node) +}); +``` + +### RedisAdapter#remoteJoin(id:String, room:String, fn:Function) + +Makes the socket with the given id join the room. The callback will be called once the socket has joined the room, or with an `err` argument if the socket was not found. + +```js +io.adapter.remoteJoin('', 'room1', function (err) { + if (err) { /* unknown id */ } + // success +}); +``` + +### RedisAdapter#remoteLeave(id:String, room:String, fn:Function) + +Makes the socket with the given id leave the room. The callback will be called once the socket has left the room, or with an `err` argument if the socket was not found. + +```js +io.adapter.remoteLeave('', 'room1', function (err) { + if (err) { /* unknown id */ } + // success +}); +``` + +### RedisAdapter#customRequest(data:Object, fn:Function) + +Sends a request to every nodes, that will respond through the `customHook` method. + +```js +// on every node +io.adapter.customHook = function (data) { + return 'hello ' + data; +} + +// then +io.adapter.customRequest('john', function(err, replies){ + console.log(replies); // an array ['hello john', ...] with one element per node +}); +``` + ## Client error handling Access the `pubClient` and `subClient` properties of the