From c84ab1fa98cd0b10a9849685a7776a21309001d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dobrza=C5=84ski?= Date: Mon, 7 Aug 2023 14:14:56 +0200 Subject: [PATCH 1/3] wip: deserialization --- .../EndPoint/PubSub/SubscribeOperation2.cs | 2 +- .../Subscribe/Effects/EmitMessagesHandler.cs | 44 ++++++++++++++----- .../Subscribe/SubscribeEventEngine.cs | 20 ++++++++- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs index 2a4f7282f..90fc8af3e 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs @@ -703,7 +703,7 @@ private void Subscribe(string[] channels, string[] channelGroups, Dictionary(channels, channelGroups); } internal bool Retry(bool reconnect) diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs index e946b5c4a..ce51c2805 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs @@ -1,4 +1,6 @@ using System; +using System.Collections; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Newtonsoft.Json; @@ -11,9 +13,11 @@ public class EmitMessagesHandler : EffectHandler> messageEmitterFunction; private readonly Pubnub pubnubInstance; + private readonly Dictionary channelTypeMap; + private readonly Dictionary channelGroupTypeMap; public EmitMessagesHandler(Pubnub pubnubInstance, - System.Action> messageEmitterFunction) + System.Action> messageEmitterFunction, Dictionary channelTypeMap = null, Dictionary channelGroupTypeMap = null) { this.messageEmitterFunction = messageEmitterFunction; this.pubnubInstance = pubnubInstance; @@ -21,18 +25,38 @@ public EmitMessagesHandler(Pubnub pubnubInstance, public async override Task Run(EmitMessagesInvocation invocation) { - var processedMessages = invocation.Messages?.Messages.Select(m => new PNMessageResult() + var processedMessages = invocation.Messages?.Messages.Select(m => { - Channel = m.Channel, - Message = JsonConvert.DeserializeObject(m.Payload), - Subscription = m.SubscriptionMatch, - Timetoken = m.Timetoken.Timestamp, - UserMetadata = m.PublishMetadata, - Publisher = m.IssuingClientId - }); + var msgResult = new PNMessageResult() + { + Channel = m.Channel, + Subscription = m.SubscriptionMatch, + Timetoken = m.Timetoken.Timestamp, + UserMetadata = m.PublishMetadata, + Publisher = m.IssuingClientId + }; + + if (!(channelTypeMap is null) && channelTypeMap.TryGetValue(m.Channel, out var msgType)) + { + msgResult.Message = JsonConvert.DeserializeObject(m.Payload, msgType); + } else if (!(channelGroupTypeMap is null) && channelGroupTypeMap.TryGetValue(m.SubscriptionMatch, out var T)) + { + msgResult.Message = JsonConvert.DeserializeObject(m.Payload, T); + } + else + { + msgResult.Message = m.Payload; + } - processedMessages?.ToList().ForEach(message => messageEmitterFunction(pubnubInstance, message)); + return msgResult; + }); + if (processedMessages is null) return; + + foreach (var message in processedMessages) + { + messageEmitterFunction(pubnubInstance, message); + } } public override bool IsBackground(EmitMessagesInvocation invocation) => false; diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs index da88ecc14..19b20dcb2 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs @@ -5,11 +5,15 @@ using PubnubApi.EventEngine.Subscribe.Effects; using PubnubApi.EventEngine.Subscribe.Invocations; using System; +using System.Collections.Generic; namespace PubnubApi.EventEngine.Subscribe { public class SubscribeEventEngine : Engine { private SubscribeManager2 subscribeManager; + private readonly Dictionary channelTypeMap = new Dictionary(); + private readonly Dictionary channelGroupTypeMap = new Dictionary(); + internal SubscribeEventEngine(Pubnub pubnubInstance, PNConfiguration pubnubConfiguration, SubscribeManager2 subscribeManager, @@ -39,10 +43,24 @@ internal SubscribeEventEngine(Pubnub pubnubInstance, currentState = new UnsubscribedState() { ReconnectionConfiguration = new Context.ReconnectionConfiguration(pubnubConfiguration.ReconnectionPolicy, pubnubConfiguration.ConnectionMaxRetries) }; } - public void Subscribe(string[] channels, string[] channelGroups) + public void Subscribe(string[] channels, string[] channelGroups) { + foreach (var c in channels) + { + channelTypeMap[c] = typeof(T); + } + foreach (var c in channelGroups) + { + channelGroupTypeMap[c] = typeof(T); + } this.EventQueue.Enqueue(new SubscriptionChangedEvent() { Channels = channels, ChannelGroups = channelGroups }); } + + public void Subscribe(string[] channels, string[] channelGroups) + { + Subscribe(channels, channelGroups); + } + public void UnsubscribeAll() { this.EventQueue.Enqueue(new UnsubscribeAllEvent()); From 54f735c6f804004c9e9c4575a95ea08e274cdc16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dobrza=C5=84ski?= Date: Mon, 7 Aug 2023 14:16:24 +0200 Subject: [PATCH 2/3] wip: deserialization --- .../EventEngine/Subscribe/Effects/EmitMessagesHandler.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs index ce51c2805..cd0bed6e6 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs @@ -35,13 +35,14 @@ public async override Task Run(EmitMessagesInvocation invocation) UserMetadata = m.PublishMetadata, Publisher = m.IssuingClientId }; - - if (!(channelTypeMap is null) && channelTypeMap.TryGetValue(m.Channel, out var msgType)) + + Type msgType, groupType; + if (!(channelTypeMap is null) && channelTypeMap.TryGetValue(m.Channel, out msgType)) { msgResult.Message = JsonConvert.DeserializeObject(m.Payload, msgType); - } else if (!(channelGroupTypeMap is null) && channelGroupTypeMap.TryGetValue(m.SubscriptionMatch, out var T)) + } else if (!(channelGroupTypeMap is null) && channelGroupTypeMap.TryGetValue(m.SubscriptionMatch, out groupType)) { - msgResult.Message = JsonConvert.DeserializeObject(m.Payload, T); + msgResult.Message = JsonConvert.DeserializeObject(m.Payload, groupType); } else { From 4e388618f7f1965a497dd4f89dd63a9343e68e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dobrza=C5=84ski?= Date: Mon, 7 Aug 2023 14:25:18 +0200 Subject: [PATCH 3/3] fix: passed dependencies to emit messages handler --- src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs index 19b20dcb2..376fb51d1 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs @@ -35,7 +35,7 @@ internal SubscribeEventEngine(Pubnub pubnubInstance, dispatcher.Register(receiveHandler); dispatcher.Register(receiveHandler); - var emitMessageHandler = new Effects.EmitMessagesHandler(pubnubInstance, messageListener); + var emitMessageHandler = new Effects.EmitMessagesHandler(pubnubInstance, messageListener, channelTypeMap, channelGroupTypeMap); dispatcher.Register(emitMessageHandler); var emitStatusHandler = new Effects.EmitStatusEffectHandler(pubnubInstance, statusListener);