Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions samples/ChatSample/PresenceHubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,10 @@ public override Task RemoveGroupAsync(string connectionId, string groupName)
{
return _wrappedHubLifetimeManager.RemoveGroupAsync(connectionId, groupName);
}

public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
{
return _wrappedHubLifetimeManager.InvokeGroupExceptAsync(groupName, methodName, args, excludedIds);
}
}
}
19 changes: 19 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@ public override Task InvokeGroupAsync(string groupName, string methodName, objec
return Task.CompletedTask;
}

public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}

var group = _groups[groupName];
if (group != null)
{
var message = CreateInvocationMessage(methodName, args);
var tasks = group.Values.Where(connection => !excludedIds.Contains(connection.ConnectionId))
.Select(c => c.WriteAsync(message));
return Task.WhenAll(tasks);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a reasonable place to add logging or should we first finalize our logging strategy? @BrennanConroy @anurse


return Task.CompletedTask;
}

private InvocationMessage CreateInvocationMessage(string methodName, object[] args)
{
return new InvocationMessage(GetInvocationId(), nonBlocking: true, target: methodName, argumentBindingException: null, arguments: args);
Expand Down
3 changes: 2 additions & 1 deletion src/Microsoft.AspNetCore.SignalR.Core/DynamicHubClients.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public DynamicHubClients(IHubCallerClients clients)
public dynamic AllExcept(IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.AllExcept(excludedIds));
public dynamic Caller => new DynamicClientProxy(_clients.Caller);
public dynamic Client(string connectionId) => new DynamicClientProxy(_clients.Client(connectionId));
public dynamic Group(string group) => new DynamicClientProxy(_clients.Group(group));
public dynamic Group(string groupName) => new DynamicClientProxy(_clients.Group(groupName));
public dynamic GroupExcept(string groupName, IReadOnlyList<string> excludedIds) => new DynamicClientProxy(_clients.GroupExcept(groupName, excludedIds));
public dynamic Others => new DynamicClientProxy(_clients.Others);
public dynamic User(string userId) => new DynamicClientProxy(_clients.User(userId));
}
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/HubCallerClients.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public IClientProxy Group(string groupName)
return _hubClients.Group(groupName);
}

public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
{
return _hubClients.GroupExcept(groupName, excludeIds);
}

public IClientProxy User(string userId)
{
return _hubClients.User(userId);
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/HubContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public virtual IClientProxy Group(string groupName)
return new GroupProxy<THub>(_lifetimeManager, groupName);
}

public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
{
return new GroupExceptProxy<THub>(_lifetimeManager, groupName, excludeIds);
}

public virtual IClientProxy User(string userId)
{
return new UserProxy<THub>(_lifetimeManager, userId);
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/HubContext`T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public virtual T Group(string groupName)
return TypedClientBuilder<T>.Build(new GroupProxy<THub>(_lifetimeManager, groupName));
}

public T GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
{
return TypedClientBuilder<T>.Build(new GroupExceptProxy<THub>(_lifetimeManager, groupName, excludeIds));
}

public virtual T User(string userId)
{
return TypedClientBuilder<T>.Build(new UserProxy<THub>(_lifetimeManager, userId));
Expand Down
2 changes: 2 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/HubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public abstract class HubLifetimeManager<THub>

public abstract Task InvokeGroupAsync(string groupName, string methodName, object[] args);

public abstract Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds);

public abstract Task InvokeUserAsync(string userId, string methodName, object[] args);

public abstract Task AddGroupAsync(string connectionId, string groupName);
Expand Down
2 changes: 2 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/IHubClients`T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public interface IHubClients<T>

T Group(string groupName);

T GroupExcept(string groupName, IReadOnlyList<string> excludeIds);

T User(string userId);
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/Proxies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,25 @@ public Task InvokeAsync(string method, params object[] args)
}
}

public class GroupExceptProxy<THub> : IClientProxy
{
private readonly string _groupName;
private readonly HubLifetimeManager<THub> _lifetimeManager;
private IReadOnlyList<string> _excludedIds;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readonly


public GroupExceptProxy(HubLifetimeManager<THub> lifetimeManager, string groupName, IReadOnlyList<string> excludedIds)
{
_lifetimeManager = lifetimeManager;
_groupName = groupName;
_excludedIds = excludedIds;
}

public Task InvokeAsync(string method, params object[] args)
{
return _lifetimeManager.InvokeGroupExceptAsync(_groupName, method, args, _excludedIds);
}
}

public class AllClientProxy<THub> : IClientProxy
{
private readonly HubLifetimeManager<THub> _lifetimeManager;
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Core/TypedHubClients.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public T Group(string groupName)
return TypedClientBuilder<T>.Build(_hubClients.Group(groupName));
}

public T GroupExcept(string groupName, IReadOnlyList<string> excludeIds)
{
return TypedClientBuilder<T>.Build(_hubClients.GroupExcept(groupName, excludeIds));
}

public T User(string userId)
{
return TypedClientBuilder<T>.Build(_hubClients.User(userId));
Expand Down
23 changes: 20 additions & 3 deletions src/Microsoft.AspNetCore.SignalR.Redis/RedisHubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,19 @@ public override Task InvokeGroupAsync(string groupName, string methodName, objec
throw new ArgumentNullException(nameof(groupName));
}

var message = new InvocationMessage(GetInvocationId(), nonBlocking: true, target: methodName, argumentBindingException: null, arguments: args);
var message = new RedisExcludeClientsMessage(GetInvocationId(), nonBlocking: true, target: methodName, excludedIds: null, arguments: args);

return PublishAsync(_channelNamePrefix + ".group." + groupName, message);
}

public override Task InvokeGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedIds)
{
if (groupName == null)
{
throw new ArgumentNullException(nameof(groupName));
}

var message = new RedisExcludeClientsMessage(GetInvocationId(), nonBlocking: true, target: methodName, excludedIds: excludedIds, arguments: args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. It uses an existing message and we just publish it to a different Redis key. 👍


return PublishAsync(_channelNamePrefix + ".group." + groupName, message);
}
Expand Down Expand Up @@ -547,11 +559,16 @@ private Task SubscribeToGroup(string groupChannel, GroupData group)
{
try
{
var message = DeserializeMessage<HubInvocationMessage>(data);
var message = DeserializeMessage<RedisExcludeClientsMessage>(data);

var tasks = new List<Task>(group.Connections.Count);
var tasks = new List<Task>();
foreach (var groupConnection in group.Connections)
{
if (message.ExcludedIds?.Contains(groupConnection.ConnectionId) == true)
{
continue;
}

tasks.Add(groupConnection.WriteAsync(message));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -102,6 +103,37 @@ public async Task InvokeGroupAsyncWritesToAllConnectionsInGroupOutput()
}
}

[Fact]
public async Task InvokeGroupExceptAsyncWritesToAllValidConnectionsInGroupOutput()
{
using (var client1 = new TestClient())
using (var client2 = new TestClient())
{
var manager = new RedisHubLifetimeManager<MyHub>(new LoggerFactory().CreateLogger<RedisHubLifetimeManager<MyHub>>(),
Options.Create(new RedisOptions()
{
Factory = t => new TestConnectionMultiplexer()
}));
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
var connection2 = HubConnectionContextUtils.Create(client2.Connection);

await manager.OnConnectedAsync(connection1).OrTimeout();
await manager.OnConnectedAsync(connection2).OrTimeout();

await manager.AddGroupAsync(connection1.ConnectionId, "gunit").OrTimeout();
await manager.AddGroupAsync(connection2.ConnectionId, "gunit").OrTimeout();

var excludedIds = new List<string>{client2.Connection.ConnectionId };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after {

await manager.InvokeGroupExceptAsync("gunit", "Hello", new object[] { "World" }, excludedIds).OrTimeout();

await AssertMessageAsync(client1);

await connection1.DisposeAsync().OrTimeout();
await connection2.DisposeAsync().OrTimeout();
Assert.Null(client2.TryRead());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this before disposing

}
}

[Fact]
public async Task InvokeConnectionAsyncWritesToConnectionOutput()
{
Expand Down
67 changes: 67 additions & 0 deletions test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,58 @@ public async Task HubsCanAddAndSendToGroup(Type hubType)
}
}

[Theory]
[MemberData(nameof(HubTypes))]
public async Task SendToGroupExcept(Type hubType)
{
var serviceProvider = CreateServiceProvider();

dynamic endPoint = serviceProvider.GetService(GetEndPointType(hubType));

using (var firstClient = new TestClient())
using (var secondClient = new TestClient())
{
Task firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection);
Task secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection);

await Task.WhenAll(firstClient.Connected, secondClient.Connected).OrTimeout();

var result = (await firstClient.InvokeAsync("GroupSendMethod", "testGroup", "test").OrTimeout()).Result;

// check that 'firstConnection' hasn't received the group send
Assert.Null(firstClient.TryRead());

// check that 'secondConnection' hasn't received the group send
Assert.Null(secondClient.TryRead());

await firstClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "testGroup").OrTimeout();
await secondClient.InvokeAsync(nameof(MethodHub.GroupAddMethod), "testGroup").OrTimeout();

var excludedIds = new List<string> { firstClient.Connection.ConnectionId };

await firstClient.SendInvocationAsync("GroupExceptSendMethod", "testGroup", "test", excludedIds).OrTimeout();

// check that 'secondConnection' has received the group send
var hubMessage = await secondClient.ReadAsync().OrTimeout();
var invocation = Assert.IsType<InvocationMessage>(hubMessage);
Assert.Equal("Send", invocation.Target);
Assert.Single(invocation.Arguments);
Assert.Equal("test", invocation.Arguments[0]);

// Check that first client only got the completion message
hubMessage = await firstClient.ReadAsync().OrTimeout();
Assert.IsType<CompletionMessage>(hubMessage);

Assert.Null(firstClient.TryRead());

// kill the connections
firstClient.Dispose();
secondClient.Dispose();

await Task.WhenAll(firstEndPointTask, secondEndPointTask).OrTimeout();
}
}

[Fact]
public async Task RemoveFromGroupWhenNotInGroupDoesNotFail()
{
Expand Down Expand Up @@ -1626,6 +1678,11 @@ public Task GroupSendMethod(string groupName, string message)
return Clients.Group(groupName).Send(message);
}

public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
{
return Clients.GroupExcept(groupName, excludedIds).Send(message);
}

public Task BroadcastMethod(string message)
{
return Clients.All.Broadcast(message);
Expand Down Expand Up @@ -1814,6 +1871,11 @@ public Task GroupSendMethod(string groupName, string message)
return Clients.Group(groupName).Send(message);
}

public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
{
return Clients.GroupExcept(groupName, excludedIds).Send(message);
}

public Task BroadcastMethod(string message)
{
return Clients.All.Broadcast(message);
Expand Down Expand Up @@ -1945,6 +2007,11 @@ public Task GroupSendMethod(string groupName, string message)
return Clients.Group(groupName).InvokeAsync("Send", message);
}

public Task GroupExceptSendMethod(string groupName, string message, IReadOnlyList<string> excludedIds)
{
return Clients.GroupExcept(groupName, excludedIds).InvokeAsync("Send", message);
}

public Task BroadcastMethod(string message)
{
return Clients.All.InvokeAsync("Broadcast", message);
Expand Down