Skip to content

Refactor requests between nodes and add clientRooms method #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The following options are allowed:
- `subEvent`: optional, the redis client event name to subscribe to (`message`)
- `pubClient`: optional, the redis client to publish events on
- `subClient`: optional, the redis client to subscribe to events on
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`)
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`)

If you decide to supply `pubClient` and `subClient`, make sure you use
[node_redis](https://github.com/mranney/node_redis) as a client or one
Expand All @@ -56,12 +56,16 @@ that a regular `Adapter` does not
- `prefix`
- `pubClient`
- `subClient`
- `clientsTimeout`
- `requestsTimeout`

### RedisAdapter#clients(rooms:Array, fn:Function)

Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction)

### RedisAdapter#clientRooms(id:String, fn:Function)

Returns the list of rooms the client with the given ID has joined (even on another node).

## Client error handling

Access the `pubClient` and `subClient` properties of the
Expand Down
257 changes: 180 additions & 77 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ var async = require('async');

module.exports = adapter;

/**
* Request types, for messages between nodes
*/

var requestTypes = {
clients: 0,
clientRooms: 1,
};

/**
* Returns a redis Adapter class.
*
Expand All @@ -39,7 +48,7 @@ function adapter(uri, opts){

var prefix = opts.key || 'socket.io';
var subEvent = opts.subEvent || 'message';
var clientsTimeout = opts.clientsTimeout || 1000;
var requestsTimeout = opts.requestsTimeout || 1000;

// init clients if needed
function createClient(redis_opts) {
Expand All @@ -50,11 +59,9 @@ function adapter(uri, opts){
return redis(opts.port, opts.host, redis_opts);
}
}

if (!pub) pub = createClient();
if (!sub) sub = createClient({ return_buffers: true });

var subJson = sub.duplicate({ return_buffers: false });

// this server's key
var uid = uid2(6);
Expand All @@ -71,10 +78,12 @@ function adapter(uri, opts){

this.uid = uid;
this.prefix = prefix;
this.clientsTimeout = clientsTimeout;
this.requestsTimeout = requestsTimeout;

this.channel = prefix + '#' + nsp.name + '#';
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#';
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
this.requests = {};

if (String.prototype.startsWith) {
this.channelMatches = function (messageChannel, subscribedChannel) {
Expand All @@ -90,16 +99,11 @@ function adapter(uri, opts){

var self = this;

sub.subscribe(this.channel, function(err){
if (err) self.emit('error', err);
});

subJson.subscribe(this.syncChannel, function(err){
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
if (err) self.emit('error', err);
});

sub.on(subEvent, this.onmessage.bind(this));
subJson.on(subEvent, this.onclients.bind(this));
}

/**
Expand All @@ -115,9 +119,16 @@ function adapter(uri, opts){
*/

Redis.prototype.onmessage = function(channel, msg){
if (!this.channelMatches(channel.toString(), this.channel)) {
channel = channel.toString();

if (this.channelMatches(channel, this.requestChannel)) {
return this.onrequest(channel, msg);
} else if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.channel)) {
return debug('ignore different channel');
}

var args = msgpack.decode(msg);
var packet;

Expand All @@ -139,40 +150,119 @@ function adapter(uri, opts){
};

/**
* Called with a subscription message on sync
* Called on request from another node
*
* @api private
*/

Redis.prototype.onclients = function(channel, msg){

Redis.prototype.onrequest = function(channel, msg){
var self = this;
var request;

if (!self.channelMatches(channel.toString(), self.syncChannel)) {
return debug('ignore different channel');
try {
request = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}

debug('received request %j', request);

switch (request.type) {

case requestTypes.clients:
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}

var response = JSON.stringify({
requestid: request.requestid,
clients: clients
});

pub.publish(self.responseChannel, response);
});
break;

case requestTypes.clientRooms:
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
if(err){
self.emit('error', err);
return;
}

if (!rooms) { return; }

var response = JSON.stringify({
requestid: request.requestid,
rooms: rooms
});

pub.publish(self.responseChannel, response);
});
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
};

/**
* Called on response from another node
*
* @api private
*/

Redis.prototype.onresponse = function(channel, msg){
var self = this;
var response;

try {
var decoded = JSON.parse(msg);
response = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}

Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}
if (!response.requestid || !self.requests[response.requestid]) {
debug('ignoring unknown request');
return;
}

var responseChn = prefix + '-sync#response#' + decoded.transaction;
var response = JSON.stringify({
clients : clients
});
debug('received response %j', response);

pub.publish(responseChn, response);
});

var request = self.requests[response.requestid];

switch (request.type) {

case requestTypes.clients:
request.msgCount++;

// ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;

for(var i = 0; i < response.clients.length; i++){
request.clients[response.clients[i]] = true;
}

if (request.msgCount === request.numsub) {
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
delete self.requests[request.requestid];
}
break;

case requestTypes.clientRooms:
clearTimeout(request.timeout);
if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
delete self.requests[request.requestid];
break;

default:
debug('ignoring unknown request type: %s', request.type);
}
};

/**
Expand Down Expand Up @@ -292,6 +382,7 @@ function adapter(uri, opts){
* Gets a list of clients by sid.
*
* @param {Array} explicit set of rooms to check.
* @param {Function} callback
* @api public
*/

Expand All @@ -304,11 +395,9 @@ function adapter(uri, opts){
rooms = rooms || [];

var self = this;
var requestid = uid2(6);

var transaction = uid2(6);
var responseChn = prefix + '-sync#response#' + transaction;

pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
Expand All @@ -317,64 +406,78 @@ function adapter(uri, opts){

numsub = numsub[1];

var msg_count = 0;
var clients = {};

subJson.subscribe(responseChn, function(err) {
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

var request = JSON.stringify({
transaction : transaction,
rooms : rooms
});

/*If there is no response for 1 second, return result;*/
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}, self.clientsTimeout);
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clients,
rooms : rooms
});

subJson.on(subEvent, function onEvent(channel, msg) {
// if there is no response for x second, return result
var timeout = setTimeout(function() {
var request = self.requests[requestid];
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
delete self.requests[requestid];
}, self.requestsTimeout);

self.requests[requestid] = {
type: requestTypes.clients,
numsub: numsub,
msgCount: 0,
clients: {},
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
});
};

if (!self.channelMatches(channel.toString(), responseChn)) {
return debug('ignore different channel');
}
/**
* Gets the list of rooms a given client has joined.
*
* @param {String} client id
* @param {Function} callback
* @api public
*/

var response = JSON.parse(msg);
Redis.prototype.clientRooms = function(id, fn){

//Ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;

for(var i = 0; i < response.clients.length; i++){
clients[response.clients[i]] = true;
}
var self = this;
var requestid = uid2(6);

msg_count++;
if(msg_count == numsub){
clearTimeout(timeout);
subJson.unsubscribe(responseChn);
subJson.removeListener(subEvent, onEvent);
var rooms = this.sids[id];

if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}
});
if (rooms) {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
return;
}

pub.publish(self.syncChannel, request);
var request = JSON.stringify({
requestid : requestid,
type: requestTypes.clientRooms,
sid : id
});

});
// if there is no response for x second, return result
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
delete self.requests[requestid];
}, self.requestsTimeout);

});
self.requests[requestid] = {
type: requestTypes.clientRooms,
callback: fn,
timeout: timeout
};

pub.publish(self.requestChannel, request);
};

Redis.uid = uid;
Redis.pubClient = pub;
Redis.subClient = sub;
Redis.prefix = prefix;
Redis.clientsTimeout = clientsTimeout;
Redis.requestsTimeout = requestsTimeout;

return Redis;

Expand Down
Loading