diff --git a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs index 2a4f7282f..90fc8af3e 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs @@ -703,7 +703,7 @@ private void Subscribe(string[] channels, string[] channelGroups, Dictionary(channels, channelGroups); } internal bool Retry(bool reconnect) diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs index e946b5c4a..cd0bed6e6 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs @@ -1,4 +1,6 @@ using System; +using System.Collections; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Newtonsoft.Json; @@ -11,9 +13,11 @@ public class EmitMessagesHandler : EffectHandler> messageEmitterFunction; private readonly Pubnub pubnubInstance; + private readonly Dictionary channelTypeMap; + private readonly Dictionary channelGroupTypeMap; public EmitMessagesHandler(Pubnub pubnubInstance, - System.Action> messageEmitterFunction) + System.Action> messageEmitterFunction, Dictionary channelTypeMap = null, Dictionary channelGroupTypeMap = null) { this.messageEmitterFunction = messageEmitterFunction; this.pubnubInstance = pubnubInstance; @@ -21,18 +25,39 @@ public EmitMessagesHandler(Pubnub pubnubInstance, public async override Task Run(EmitMessagesInvocation invocation) { - var processedMessages = invocation.Messages?.Messages.Select(m => new PNMessageResult() + var processedMessages = invocation.Messages?.Messages.Select(m => { - Channel = m.Channel, - Message = JsonConvert.DeserializeObject(m.Payload), - Subscription = m.SubscriptionMatch, - Timetoken = m.Timetoken.Timestamp, - UserMetadata = m.PublishMetadata, - Publisher = m.IssuingClientId - }); + var msgResult = new PNMessageResult() + { + Channel = m.Channel, + Subscription = m.SubscriptionMatch, + Timetoken = m.Timetoken.Timestamp, + UserMetadata = m.PublishMetadata, + Publisher = m.IssuingClientId + }; + + Type msgType, groupType; + if (!(channelTypeMap is null) && channelTypeMap.TryGetValue(m.Channel, out msgType)) + { + msgResult.Message = JsonConvert.DeserializeObject(m.Payload, msgType); + } else if (!(channelGroupTypeMap is null) && channelGroupTypeMap.TryGetValue(m.SubscriptionMatch, out groupType)) + { + msgResult.Message = JsonConvert.DeserializeObject(m.Payload, groupType); + } + else + { + msgResult.Message = m.Payload; + } - processedMessages?.ToList().ForEach(message => messageEmitterFunction(pubnubInstance, message)); + return msgResult; + }); + if (processedMessages is null) return; + + foreach (var message in processedMessages) + { + messageEmitterFunction(pubnubInstance, message); + } } public override bool IsBackground(EmitMessagesInvocation invocation) => false; diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs index da88ecc14..376fb51d1 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs @@ -5,11 +5,15 @@ using PubnubApi.EventEngine.Subscribe.Effects; using PubnubApi.EventEngine.Subscribe.Invocations; using System; +using System.Collections.Generic; namespace PubnubApi.EventEngine.Subscribe { public class SubscribeEventEngine : Engine { private SubscribeManager2 subscribeManager; + private readonly Dictionary channelTypeMap = new Dictionary(); + private readonly Dictionary channelGroupTypeMap = new Dictionary(); + internal SubscribeEventEngine(Pubnub pubnubInstance, PNConfiguration pubnubConfiguration, SubscribeManager2 subscribeManager, @@ -31,7 +35,7 @@ internal SubscribeEventEngine(Pubnub pubnubInstance, dispatcher.Register(receiveHandler); dispatcher.Register(receiveHandler); - var emitMessageHandler = new Effects.EmitMessagesHandler(pubnubInstance, messageListener); + var emitMessageHandler = new Effects.EmitMessagesHandler(pubnubInstance, messageListener, channelTypeMap, channelGroupTypeMap); dispatcher.Register(emitMessageHandler); var emitStatusHandler = new Effects.EmitStatusEffectHandler(pubnubInstance, statusListener); @@ -39,10 +43,24 @@ internal SubscribeEventEngine(Pubnub pubnubInstance, currentState = new UnsubscribedState() { ReconnectionConfiguration = new Context.ReconnectionConfiguration(pubnubConfiguration.ReconnectionPolicy, pubnubConfiguration.ConnectionMaxRetries) }; } - public void Subscribe(string[] channels, string[] channelGroups) + public void Subscribe(string[] channels, string[] channelGroups) { + foreach (var c in channels) + { + channelTypeMap[c] = typeof(T); + } + foreach (var c in channelGroups) + { + channelGroupTypeMap[c] = typeof(T); + } this.EventQueue.Enqueue(new SubscriptionChangedEvent() { Channels = channels, ChannelGroups = channelGroups }); } + + public void Subscribe(string[] channels, string[] channelGroups) + { + Subscribe(channels, channelGroups); + } + public void UnsubscribeAll() { this.EventQueue.Enqueue(new UnsubscribeAllEvent());