diff --git a/README.md b/README.md index 42f10a2..52c5119 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ The following options are allowed: - `subEvent`: optional, the redis client event name to subscribe to (`message`) - `pubClient`: optional, the redis client to publish events on - `subClient`: optional, the redis client to subscribe to events on -- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`) +- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`) If you decide to supply `pubClient` and `subClient`, make sure you use [node_redis](https://github.com/mranney/node_redis) as a client or one @@ -56,12 +56,16 @@ that a regular `Adapter` does not - `prefix` - `pubClient` - `subClient` -- `clientsTimeout` +- `requestsTimeout` ### RedisAdapter#clients(rooms:Array, fn:Function) Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction) +### RedisAdapter#clientRooms(id:String, fn:Function) + +Returns the list of rooms the client with the given ID has joined (even on another node). + ## Client error handling Access the `pubClient` and `subClient` properties of the diff --git a/index.js b/index.js index 04df130..ac9bc74 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,15 @@ var async = require('async'); module.exports = adapter; +/** + * Request types, for messages between nodes + */ + +var requestTypes = { + clients: 0, + clientRooms: 1, +}; + /** * Returns a redis Adapter class. * @@ -39,7 +48,7 @@ function adapter(uri, opts){ var prefix = opts.key || 'socket.io'; var subEvent = opts.subEvent || 'message'; - var clientsTimeout = opts.clientsTimeout || 1000; + var requestsTimeout = opts.requestsTimeout || 1000; // init clients if needed function createClient(redis_opts) { @@ -50,11 +59,9 @@ function adapter(uri, opts){ return redis(opts.port, opts.host, redis_opts); } } - + if (!pub) pub = createClient(); if (!sub) sub = createClient({ return_buffers: true }); - - var subJson = sub.duplicate({ return_buffers: false }); // this server's key var uid = uid2(6); @@ -71,10 +78,12 @@ function adapter(uri, opts){ this.uid = uid; this.prefix = prefix; - this.clientsTimeout = clientsTimeout; + this.requestsTimeout = requestsTimeout; this.channel = prefix + '#' + nsp.name + '#'; - this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#'; + this.requestChannel = prefix + '-request#' + this.nsp.name + '#'; + this.responseChannel = prefix + '-response#' + this.nsp.name + '#'; + this.requests = {}; if (String.prototype.startsWith) { this.channelMatches = function (messageChannel, subscribedChannel) { @@ -90,16 +99,11 @@ function adapter(uri, opts){ var self = this; - sub.subscribe(this.channel, function(err){ - if (err) self.emit('error', err); - }); - - subJson.subscribe(this.syncChannel, function(err){ + sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){ if (err) self.emit('error', err); }); sub.on(subEvent, this.onmessage.bind(this)); - subJson.on(subEvent, this.onclients.bind(this)); } /** @@ -115,9 +119,16 @@ function adapter(uri, opts){ */ Redis.prototype.onmessage = function(channel, msg){ - if (!this.channelMatches(channel.toString(), this.channel)) { + channel = channel.toString(); + + if (this.channelMatches(channel, this.requestChannel)) { + return this.onrequest(channel, msg); + } else if (this.channelMatches(channel, this.responseChannel)) { + return this.onresponse(channel, msg); + } else if (!this.channelMatches(channel, this.channel)) { return debug('ignore different channel'); } + var args = msgpack.decode(msg); var packet; @@ -139,40 +150,119 @@ function adapter(uri, opts){ }; /** - * Called with a subscription message on sync + * Called on request from another node * * @api private */ - Redis.prototype.onclients = function(channel, msg){ - + Redis.prototype.onrequest = function(channel, msg){ var self = this; + var request; - if (!self.channelMatches(channel.toString(), self.syncChannel)) { - return debug('ignore different channel'); + try { + request = JSON.parse(msg); + } catch(err){ + self.emit('error', err); + return; } + debug('received request %j', request); + + switch (request.type) { + + case requestTypes.clients: + Adapter.prototype.clients.call(self, request.rooms, function(err, clients){ + if(err){ + self.emit('error', err); + return; + } + + var response = JSON.stringify({ + requestid: request.requestid, + clients: clients + }); + + pub.publish(self.responseChannel, response); + }); + break; + + case requestTypes.clientRooms: + Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){ + if(err){ + self.emit('error', err); + return; + } + + if (!rooms) { return; } + + var response = JSON.stringify({ + requestid: request.requestid, + rooms: rooms + }); + + pub.publish(self.responseChannel, response); + }); + break; + + default: + debug('ignoring unknown request type: %s', request.type); + } + }; + + /** + * Called on response from another node + * + * @api private + */ + + Redis.prototype.onresponse = function(channel, msg){ + var self = this; + var response; + try { - var decoded = JSON.parse(msg); + response = JSON.parse(msg); } catch(err){ self.emit('error', err); return; } - Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){ - if(err){ - self.emit('error', err); - return; - } + if (!response.requestid || !self.requests[response.requestid]) { + debug('ignoring unknown request'); + return; + } - var responseChn = prefix + '-sync#response#' + decoded.transaction; - var response = JSON.stringify({ - clients : clients - }); + debug('received response %j', response); - pub.publish(responseChn, response); - }); - + var request = self.requests[response.requestid]; + + switch (request.type) { + + case requestTypes.clients: + request.msgCount++; + + // ignore if response does not contain 'clients' key + if(!response.clients || !Array.isArray(response.clients)) return; + + for(var i = 0; i < response.clients.length; i++){ + request.clients[response.clients[i]] = true; + } + + if (request.msgCount === request.numsub) { + clearTimeout(request.timeout); + if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients))); + delete self.requests[request.requestid]; + } + break; + + case requestTypes.clientRooms: + clearTimeout(request.timeout); + if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms)); + delete self.requests[request.requestid]; + break; + + default: + debug('ignoring unknown request type: %s', request.type); + } }; /** @@ -292,6 +382,7 @@ function adapter(uri, opts){ * Gets a list of clients by sid. * * @param {Array} explicit set of rooms to check. + * @param {Function} callback * @api public */ @@ -304,11 +395,9 @@ function adapter(uri, opts){ rooms = rooms || []; var self = this; + var requestid = uid2(6); - var transaction = uid2(6); - var responseChn = prefix + '-sync#response#' + transaction; - - pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){ + pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){ if (err) { self.emit('error', err); if (fn) fn(err); @@ -317,64 +406,78 @@ function adapter(uri, opts){ numsub = numsub[1]; - var msg_count = 0; - var clients = {}; - - subJson.subscribe(responseChn, function(err) { - if (err) { - self.emit('error', err); - if (fn) fn(err); - return; - } - - var request = JSON.stringify({ - transaction : transaction, - rooms : rooms - }); - - /*If there is no response for 1 second, return result;*/ - var timeout = setTimeout(function() { - if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); - }, self.clientsTimeout); + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.clients, + rooms : rooms + }); - subJson.on(subEvent, function onEvent(channel, msg) { + // if there is no response for x second, return result + var timeout = setTimeout(function() { + var request = self.requests[requestid]; + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients))); + delete self.requests[requestid]; + }, self.requestsTimeout); + + self.requests[requestid] = { + type: requestTypes.clients, + numsub: numsub, + msgCount: 0, + clients: {}, + callback: fn, + timeout: timeout + }; + + pub.publish(self.requestChannel, request); + }); + }; - if (!self.channelMatches(channel.toString(), responseChn)) { - return debug('ignore different channel'); - } + /** + * Gets the list of rooms a given client has joined. + * + * @param {String} client id + * @param {Function} callback + * @api public + */ - var response = JSON.parse(msg); + Redis.prototype.clientRooms = function(id, fn){ - //Ignore if response does not contain 'clients' key - if(!response.clients || !Array.isArray(response.clients)) return; - - for(var i = 0; i < response.clients.length; i++){ - clients[response.clients[i]] = true; - } + var self = this; + var requestid = uid2(6); - msg_count++; - if(msg_count == numsub){ - clearTimeout(timeout); - subJson.unsubscribe(responseChn); - subJson.removeListener(subEvent, onEvent); + var rooms = this.sids[id]; - if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients))); - } - }); + if (rooms) { + if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms))); + return; + } - pub.publish(self.syncChannel, request); + var request = JSON.stringify({ + requestid : requestid, + type: requestTypes.clientRooms, + sid : id + }); - }); + // if there is no response for x second, return result + var timeout = setTimeout(function() { + if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response'))); + delete self.requests[requestid]; + }, self.requestsTimeout); - }); + self.requests[requestid] = { + type: requestTypes.clientRooms, + callback: fn, + timeout: timeout + }; + pub.publish(self.requestChannel, request); }; Redis.uid = uid; Redis.pubClient = pub; Redis.subClient = sub; Redis.prefix = prefix; - Redis.clientsTimeout = clientsTimeout; + Redis.requestsTimeout = requestsTimeout; return Redis; diff --git a/test/index.js b/test/index.js index 4e97f19..3a15c60 100644 --- a/test/index.js +++ b/test/index.js @@ -161,6 +161,46 @@ describe('socket.io-redis', function(){ }); }); + describe('rooms', function () { + it('returns rooms of a given client', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot1', function () { + server1.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot1']); + client1.disconnect(); + client2.disconnect(); + done(); + }); + }); + }); + + }); + }); + }); + + it('returns rooms of a given client from another node', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot2', function () { + server2.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot2']); + client1.disconnect(); + client2.disconnect(); + done(); + }); + }); + }); + + }); + }); + }); + }); + // create a pair of socket.io server+client function create(nsp, fn){ var srv = http(); diff --git a/test/ioredis.js b/test/ioredis.js index dd70701..9a01520 100644 --- a/test/ioredis.js +++ b/test/ioredis.js @@ -163,6 +163,46 @@ describe('socket.io-redis with ioredis', function(){ }); }); + describe('rooms', function () { + it('returns rooms of a given client', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot1', function () { + server1.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot1']); + client1.disconnect(); + client2.disconnect(); + done(); + }); + }); + }); + + }); + }); + }); + + it('returns rooms of a given client from another node', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot2', function () { + server2.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot2']); + client1.disconnect(); + client2.disconnect(); + done(); + }); + }); + }); + + }); + }); + }); + }); + // create a pair of socket.io server+client function create(nsp, fn){ var srv = http();