diff --git a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs index 7e9511bf7..61b9d5c26 100644 --- a/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs +++ b/src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs @@ -26,7 +26,7 @@ public class SubscribeOperation2: ISubscribeOperation private bool presenceSubscribeEnabled; private SubscribeManager2 manager; private Dictionary queryParam; - private EventEngine pnEventEngine; + private PubnubEventEngine.EventEngine pnEventEngine; private Pubnub PubnubInstance; public List SubscribeListenerList { @@ -104,18 +104,18 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j effectDispatcher.Register(EventType.ReceiveReconnectSuccess, receiveReconnectEffectHandler); effectDispatcher.Register(EventType.ReceiveReconnectGiveUp, receiveReconnectEffectHandler); - pnEventEngine = new EventEngine(effectDispatcher, eventEmitter); - pnEventEngine.PubnubUnitTest = unit; - pnEventEngine.Setup(config); - - if (pnEventEngine.PubnubUnitTest != null) - { - pnEventEngine.PubnubUnitTest.EventTypeList = new List>(); - } - else - { - pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged }); - } + // pnEventEngine = new EventEngine(effectDispatcher, eventEmitter); + // pnEventEngine.PubnubUnitTest = unit; + // pnEventEngine.Setup(config); + + // if (pnEventEngine.PubnubUnitTest != null) + // { + // pnEventEngine.PubnubUnitTest.EventTypeList = new List>(); + // } + // else + // { + //pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged }); + // } } private void ReceivingEffect_ReceiveRequested(object sender, ReceiveRequestEventArgs e) diff --git a/src/Api/PubnubApi/EventEngine/Common/Delay.cs b/src/Api/PubnubApi/EventEngine/Common/Delay.cs new file mode 100644 index 000000000..3afb9a4a6 --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Common/Delay.cs @@ -0,0 +1,60 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace PubnubApi.EventEngine.Common +{ + public class Delay + { + public bool Cancelled { get; private set; } = false; + private readonly TaskCompletionSource taskCompletionSource = new TaskCompletionSource(); + private readonly object monitor = new object(); + private readonly int milliseconds; + + public Delay(int milliseconds) + { + this.milliseconds = milliseconds; + } + + public Task Start() + { + #if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12 + Task taskAwaiter = Task.Factory.StartNew(AwaiterLoop); + taskAwaiter.Wait(); + #else + Thread awaiterThread = new Thread(AwaiterLoop); + awaiterThread.Start(); + #endif + return taskCompletionSource.Task; } + + public void Cancel() + { + lock (monitor) + { + Cancelled = true; + Monitor.Pulse(monitor); + } + } + + private void AwaiterLoop() + { + while(true) + { + lock (monitor) + { + if (Cancelled) + { + taskCompletionSource.SetCanceled(); + break; + } + Monitor.Wait(monitor, milliseconds); + if (Cancelled) + { + taskCompletionSource.SetCanceled(); + break; + } + taskCompletionSource.SetResult(null); + } + } + } + } +} \ No newline at end of file diff --git a/src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs b/src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs index babde75de..334366490 100644 --- a/src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs +++ b/src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace PubnubApi.PubnubEventEngine.Core { +namespace PubnubApi.EventEngine.Core { internal class EffectDispatcher { // assumes 1 instance of handler - capable of managing itself private readonly Dictionary effectInvocationHandlerMap = @@ -18,8 +18,13 @@ public async Task Dispatch(T invocation) where T : IEffectInvocation { if (invocation is IEffectCancelInvocation) { await effectInvocationHandlerMap[invocation.GetType()].Cancel(); - } else { - await ((IEffectHandler)effectInvocationHandlerMap[invocation.GetType()]).Run(invocation); + } else + { + var handler = ((IEffectHandler)effectInvocationHandlerMap[invocation.GetType()]); + if (handler.IsBackground(invocation)) + handler.Run(invocation).Start(); + else + await handler.Run(invocation); } } diff --git a/src/Api/PubnubApi/EventEngine/Core/Engine.cs b/src/Api/PubnubApi/EventEngine/Core/Engine.cs index f0315ac13..11b176c16 100644 --- a/src/Api/PubnubApi/EventEngine/Core/Engine.cs +++ b/src/Api/PubnubApi/EventEngine/Core/Engine.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using System.Collections.Generic; -namespace PubnubApi.PubnubEventEngine.Core { +namespace PubnubApi.EventEngine.Core { internal abstract class Engine { public EventQueue eventQueue = new EventQueue(); diff --git a/src/Api/PubnubApi/EventEngine/Core/EventEngineInterfaces.cs b/src/Api/PubnubApi/EventEngine/Core/EventEngineInterfaces.cs index 4dbc203b9..16717dc36 100644 --- a/src/Api/PubnubApi/EventEngine/Core/EventEngineInterfaces.cs +++ b/src/Api/PubnubApi/EventEngine/Core/EventEngineInterfaces.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using System.Collections.Generic; -namespace PubnubApi.PubnubEventEngine.Core { +namespace PubnubApi.EventEngine.Core { /// /// Generic effect handler. @@ -17,6 +17,7 @@ internal interface IEffectHandler { /// Associated invocation internal interface IEffectHandler : IEffectHandler where T : IEffectInvocation { Task Run(T invocation); + bool IsBackground(T invocation); } /// diff --git a/src/Api/PubnubApi/EventEngine/Core/EventQueue.cs b/src/Api/PubnubApi/EventEngine/Core/EventQueue.cs index a0eb2fe64..a23e30d39 100644 --- a/src/Api/PubnubApi/EventEngine/Core/EventQueue.cs +++ b/src/Api/PubnubApi/EventEngine/Core/EventQueue.cs @@ -2,7 +2,7 @@ using System.Linq; using System.Threading.Tasks; -namespace PubnubApi.PubnubEventEngine.Core +namespace PubnubApi.EventEngine.Core { internal class EventQueue { diff --git a/src/Api/PubnubApi/EventEngine/Core/Utils.cs b/src/Api/PubnubApi/EventEngine/Core/Utils.cs index f3edacb10..c7c4cbe04 100644 --- a/src/Api/PubnubApi/EventEngine/Core/Utils.cs +++ b/src/Api/PubnubApi/EventEngine/Core/Utils.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using System.Collections.Generic; -namespace PubnubApi.PubnubEventEngine.Core +namespace PubnubApi.EventEngine.Core { internal static class Utils { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Common/CommonSubscribeTypes.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Common/CommonSubscribeTypes.cs new file mode 100644 index 000000000..19157d716 --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Common/CommonSubscribeTypes.cs @@ -0,0 +1,85 @@ +using Newtonsoft.Json; + +namespace PubnubApi.EventEngine.Subscribe.Common +{ + public class SubscriptionCursor + { + public long? Timetoken { get; set; } + public int? Region { get; set; } + } + + public class HandshakeResponse + { + [JsonProperty("t")] + public Timetoken Timetoken { get; set; } + + [JsonProperty("m")] + public object[] Messages { get; set; } + } + public class HandshakeError + { + [JsonProperty("status")] + public int Status { get; set; } + + [JsonProperty("error")] + public string ErrorMessage { get; set; } + } + + public class Timetoken + { + [JsonProperty("t")] + public long Timestamp { get; set; } + + [JsonProperty("r")] + public int Region { get; set; } + + } + + public class ReceivingResponse + { + [JsonProperty("t")] + public Timetoken Timetoken { get; set; } + + [JsonProperty("m")] + public Message[] Messages { get; set; } + } + + public class Message + { + [JsonProperty ("a")] + public string Shard { get; set;} + + [JsonProperty ("b")] + public string SubscriptionMatch { get; set;} + + [JsonProperty("c")] + public string Channel { get; set; } + + [JsonProperty("d")] + public T Payload { get; set; } + + [JsonProperty("e")] + public int MessageType { get; set; } + + [JsonProperty("f")] + public string Flags { get; set; } + + [JsonProperty("i")] + public string IssuingClientId { get; set; } + + [JsonProperty("k")] + public string SubscribeKey { get; set; } + + [JsonProperty("o")] + public object OriginatingTimetoken { get; set; } + + [JsonProperty("p")] + public object PublishMetadata { get; set; } + + [JsonProperty("s")] + public long SequenceNumber { get; set; } + + [JsonProperty("p")] + public Timetoken Timetoken { get; set; } + } +} \ No newline at end of file diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Context/ReconnectionDelayUtil.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Context/ReconnectionDelayUtil.cs new file mode 100644 index 000000000..07749610c --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Context/ReconnectionDelayUtil.cs @@ -0,0 +1,30 @@ +using System; +namespace PubnubApi.EventEngine.Subscribe.Context +{ + public static class ReconnectionDelayUtil + { + public static int CalculateDelay(PNReconnectionPolicy policy, int attempts) + { + Random numGenerator = new Random(); + int delayValue = 0; + int backoff = 5; + switch (policy) { + case PNReconnectionPolicy.LINEAR: + delayValue = attempts * backoff + numGenerator.Next(1000); + break; + case PNReconnectionPolicy.EXPONENTIAL: + delayValue = (int)(Math.Pow(2, attempts - 1) * 1000 + numGenerator.Next(1000)); + break; + } + return delayValue; + + } + + public static bool shouldRetry(PNReconnectionPolicy policy, int attempts, int maxAttempts) + { + if (policy == PNReconnectionPolicy.NONE) return false; + return maxAttempts < attempts; + } + } +} + diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs new file mode 100644 index 000000000..77c7e1eb8 --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs @@ -0,0 +1,47 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Newtonsoft.Json; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; + +namespace PubnubApi.EventEngine.Subscribe.Effects +{ + internal class EmitMessagesHandler : IEffectHandler + { + private readonly System.Action> messageEmitterFunction; + private readonly Pubnub pubnubInstance; + + public EmitMessagesHandler(Pubnub pubnubInstance, + System.Action> messageEmitterFunction) + { + this.messageEmitterFunction = messageEmitterFunction; + this.pubnubInstance = pubnubInstance; + } + + public async Task Run(EmitMessagesInvocation invocation) + { + var processedMessages = invocation.Messages.Messages.Select(m => new PNMessageResult() + { + Channel = m.Channel, + Message = JsonConvert.DeserializeObject(m.Payload), + Subscription = m.SubscriptionMatch, + Timetoken = m.Timetoken.Timestamp, + UserMetadata = m.PublishMetadata, + Publisher = m.IssuingClientId + }); + + foreach (var message in processedMessages) + { + messageEmitterFunction(pubnubInstance, message); + } + } + + public bool IsBackground(EmitMessagesInvocation invocation) => false; + + public Task Cancel() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitStatusEffectHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitStatusEffectHandler.cs new file mode 100644 index 000000000..e52ecae06 --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitStatusEffectHandler.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading.Tasks; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; + +namespace PubnubApi.EventEngine.Subscribe.Effects +{ + public class EmitStatusEffectHandler: Core.IEffectHandler + { + private readonly Action statusEmitterFunction; + private readonly Pubnub pubnubInstance; + + public EmitStatusEffectHandler(Pubnub pn, Action statusEmitter) + { + this.statusEmitterFunction = statusEmitter; + this.pubnubInstance = pn; + } + + public Task Cancel() => Utils.EmptyTask; + + bool IEffectHandler.IsBackground(EmitStatusInvocation invocation) => false; + + Task IEffectHandler.Run(EmitStatusInvocation invocation) + { + this.statusEmitterFunction(this.pubnubInstance, invocation.Status); + return Utils.EmptyTask; + } + } +} + diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/HandshakeEffectHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/HandshakeEffectHandler.cs index 72a15f8ce..f85f995d0 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/HandshakeEffectHandler.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/HandshakeEffectHandler.cs @@ -1,48 +1,121 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; using PubnubApi.EndPoint; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; - -namespace PubnubApi.PubnubEventEngine.Subscribe.Effects { - internal class HandshakeEffectHandler : Core.IEffectHandler { - private SubscribeManager2 manager; - private EventQueue eventQueue; - - public HandshakeEffectHandler(SubscribeManager2 manager, EventQueue eventQueue) { - this.manager = manager; - this.eventQueue = eventQueue; - } - - public async Task Run(HandshakeInvocation invocation) { - // TODO fix this, probably wrong :) - var resp = await manager.HandshakeRequest( - PNOperationType.PNSubscribeOperation, - invocation.Channels.ToArray(), - invocation.ChannelGroups.ToArray(), - null, - null, - invocation.InitialSubscribeQueryParams, - invocation.ExternalQueryParams - ); - - if (!resp.Item2.Error) { - // TODO move deserialization outside - // TODO does this need more error checking? - var handshakeResponse = JsonConvert.DeserializeObject(resp.Item1); - var c = new SubscriptionCursor() { - Region = handshakeResponse.Timetoken.Region, - Timetoken = handshakeResponse.Timetoken.Timestamp - }; - - eventQueue.Enqueue(new Events.HandshakeSuccessEvent() {Cursor = c}); - } - } - - public async Task Cancel() { - manager.HandshakeRequestCancellation(); - } - } +using PubnubApi.EventEngine.Common; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Context; +using PubnubApi.EventEngine.Subscribe.Events; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; + +namespace PubnubApi.EventEngine.Subscribe.Effects +{ + internal class HandshakeEffectHandler : + Core.IEffectHandler, + Core.IEffectHandler + { + private SubscribeManager2 manager; + private EventQueue eventQueue; + + private Delay retryDelay = new Delay(0); + + public HandshakeEffectHandler(SubscribeManager2 manager, EventQueue eventQueue) + { + this.manager = manager; + this.eventQueue = eventQueue; + } + + public async Task Run(HandshakeReconnectInvocation invocation) + { + if (!ReconnectionDelayUtil.shouldRetry(invocation.Policy, invocation.AttemptedRetries, invocation.MaxConnectionRetry)) + { + eventQueue.Enqueue(new HandshakeReconnectGiveUpEvent() { Status = new PNStatus(PNStatusCategory.PNCancelledCategory) }); + } + else + { + retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.Policy, invocation.AttemptedRetries)); + await retryDelay.Start(); + if (!retryDelay.Cancelled) + await Run((HandshakeInvocation)invocation); + } + } + + public bool IsBackground(HandshakeReconnectInvocation invocation) + { + return true; + } + + public async Task Run(HandshakeInvocation invocation) + { + var response = await MakeHandshakeRequest(invocation); + + switch (invocation) + { + case Invocations.HandshakeReconnectInvocation reconnectInvocation when response.Item2.Error: + eventQueue.Enqueue(new Events.HandshakeReconnectFailureEvent() { AttemptedRetries = reconnectInvocation.AttemptedRetries + 1, Status = response.Item2}); + break; + case Invocations.HandshakeReconnectInvocation reconnectInvocation: + eventQueue.Enqueue(new Events.HandshakeReconnectSuccessEvent() { Cursor = response.Item1, Status = response.Item2 }); + break; + case { } when response.Item2.Error: + eventQueue.Enqueue(new Events.HandshakeFailureEvent() { Status = response.Item2}); + break; + case { }: + eventQueue.Enqueue(new Events.HandshakeSuccessEvent() { Cursor = response.Item1, Status = response.Item2 }); + break; + + } + } + + public bool IsBackground(HandshakeInvocation invocation) + { + return false; + } + + + private async Task> MakeHandshakeRequest(HandshakeInvocation invocation) + { + var resp = await manager.HandshakeRequest( + PNOperationType.PNSubscribeOperation, + invocation.Channels.ToArray(), + invocation.ChannelGroups.ToArray(), + null, + null, + invocation.InitialSubscribeQueryParams, + invocation.ExternalQueryParams + ); + + try + { + var handshakeResponse = JsonConvert.DeserializeObject(resp.Item1); + var c = new SubscriptionCursor() + { + Region = handshakeResponse.Timetoken.Region, + Timetoken = handshakeResponse.Timetoken.Timestamp + }; + return new System.Tuple(c, resp.Item2); + } + catch (Exception e) + { + return new Tuple(null, new PNStatus(e, PNOperationType.PNSubscribeOperation, PNStatusCategory.PNUnknownCategory, invocation.Channels, invocation.ChannelGroups)); + } + } + + public async Task Cancel() + { + if (!retryDelay.Cancelled) + { + retryDelay.Cancel(); + } + else + { + manager.HandshakeRequestCancellation(); + } + } + + } } \ No newline at end of file diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Effects/ReceivingEffectHandler.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/ReceivingEffectHandler.cs new file mode 100644 index 000000000..d80cf0b3a --- /dev/null +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Effects/ReceivingEffectHandler.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Newtonsoft.Json; +using PubnubApi.EndPoint; +using PubnubApi.EventEngine.Common; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Context; +using PubnubApi.EventEngine.Subscribe.Events; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; + +namespace PubnubApi.EventEngine.Subscribe.Effects +{ + internal class ReceivingEffectHandler: + Core.IEffectHandler, + Core.IEffectHandler + { + private SubscribeManager2 manager; + private EventQueue eventQueue; + + private Delay retryDelay = new Delay(0); + + public ReceivingEffectHandler(SubscribeManager2 manager, EventQueue eventQueue) + { + this.manager = manager; + this.eventQueue = eventQueue; + } + + public Task Run(ReceiveReconnectInvocation invocation) + { + if (!ReconnectionDelayUtil.shouldRetry(invocation.Policy, invocation.AttemptedRetries, invocation.MaxConnectionRetry)) + { + eventQueue.Enqueue(new ReceiveReconnectGiveUpEvent() { Status = new PNStatus(PNStatusCategory.PNCancelledCategory) }); + } + else + { + retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.Policy, invocation.AttemptedRetries)); + // Run in the background + retryDelay.Start().ContinueWith((_) => this.Run((ReceiveMessagesInvocation)invocation)); + } + + return Utils.EmptyTask; + } + + public bool IsBackground(ReceiveReconnectInvocation invocation) + { + return true; + } + + public async Task Run(ReceiveMessagesInvocation invocation) + { + var response = await MakeReceiveMessagesRequest(invocation); + var cursor = new SubscriptionCursor() + { + Region = response.Item1?.Timetoken.Region, + Timetoken = response.Item1?.Timetoken.Timestamp + }; + + switch (invocation) + { + case Invocations.ReceiveReconnectInvocation reconnectInvocation when response.Item2.Error: + eventQueue.Enqueue(new Events.ReceiveReconnectFailureEvent() { AttemptedRetries = reconnectInvocation.AttemptedRetries + 1, Status = response.Item2}); + break; + case Invocations.ReceiveReconnectInvocation reconnectInvocation: + eventQueue.Enqueue(new Events.ReceiveReconnectSuccessEvent() { Cursor = cursor, Status = response.Item2 }); + break; + case { } when response.Item2.Error: + eventQueue.Enqueue(new Events.ReceiveFailureEvent() { Cursor = cursor, Status = response.Item2}); + break; + case { }: + eventQueue.Enqueue(new Events.ReceiveSuccessEvent() { Cursor = cursor, Messages= response.Item1, Status = response.Item2 }); + break; + } + } + + public bool IsBackground(ReceiveMessagesInvocation invocation) + { + return true; + } + + private async Task, PNStatus>> MakeReceiveMessagesRequest(ReceiveMessagesInvocation invocation) + { + var resp = await manager.ReceiveRequest( + PNOperationType.PNSubscribeOperation, + invocation.Channels.ToArray(), + invocation.ChannelGroups.ToArray(), + invocation.Cursor.Timetoken.Value, + invocation.Cursor.Region.Value, + invocation.InitialSubscribeQueryParams, + invocation.ExternalQueryParams + ); + + try + { + //TODO: get ReceivingResponse from manager.ReceiveRequest + var receiveResponse = JsonConvert.DeserializeObject>(resp.Item1); + return new System.Tuple, PNStatus>(receiveResponse, resp.Item2); + } + catch (Exception e) + { + return new Tuple, PNStatus>(null, new PNStatus(e, PNOperationType.PNSubscribeOperation, PNStatusCategory.PNUnknownCategory, invocation.Channels, invocation.ChannelGroups)); + } + } + + public async Task Cancel() + { + if (!retryDelay.Cancelled) + { + retryDelay.Cancel(); + } + else + { + manager.ReceiveRequestCancellation(); + } + } + } +} diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Events/SubscriptionEvents.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Events/SubscriptionEvents.cs index 90efa6b9a..605826a14 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Events/SubscriptionEvents.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Events/SubscriptionEvents.cs @@ -1,9 +1,11 @@ using System.Collections.Generic; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.Events { - public class UnsubscribeAllEvent : Core.IEvent { - } - public class SubscriptionChangedEvent : Core.IEvent { +namespace PubnubApi.EventEngine.Subscribe.Events { + public class UnsubscribeAllEvent : Core.IEvent { + } + + public class SubscriptionChangedEvent : Core.IEvent { public IEnumerable Channels; public IEnumerable ChannelGroups; } @@ -21,6 +23,7 @@ public class HandshakeSuccessEvent : Core.IEvent { public class HandshakeFailureEvent : Core.IEvent { public PNStatus Status; + public int AttemptedRetries; } public class HandshakeReconnectSuccessEvent : HandshakeSuccessEvent { @@ -29,9 +32,9 @@ public class HandshakeReconnectSuccessEvent : HandshakeSuccessEvent { public class HandshakeReconnectFailureEvent : HandshakeFailureEvent { - public PNStatus Status; } + // Do we have this in system description ? public class HandshakeReconnectRetryEvent : Core.IEvent { } @@ -42,13 +45,15 @@ public class HandshakeReconnectGiveUpEvent : Core.IEvent { public class ReceiveSuccessEvent : Core.IEvent { public IEnumerable Channels; public IEnumerable ChannelGroups; - public List> Messages; + public ReceivingResponse Messages; public SubscriptionCursor Cursor; public PNStatus Status; } public class ReceiveFailureEvent : Core.IEvent { public PNStatus Status; + public int AttemptedRetries; + public SubscriptionCursor Cursor; } public class ReceiveReconnectRetry : Core.IEvent { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs b/src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs index 7fc22eceb..d585580f9 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs @@ -1,11 +1,12 @@ using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.Invocations { +namespace PubnubApi.EventEngine.Subscribe.Invocations { internal class EmitMessagesInvocation : Core.IEffectInvocation { - public IEnumerable> Messages; + public ReceivingResponse Messages; - public EmitMessagesInvocation(IEnumerable> messages) + public EmitMessagesInvocation(ReceivingResponse messages) { this.Messages = messages; } @@ -44,25 +45,28 @@ internal class ReceiveMessagesInvocation : Core.IEffectInvocation public IEnumerable Channels; public IEnumerable ChannelGroups; public SubscriptionCursor Cursor; + public Dictionary InitialSubscribeQueryParams = new Dictionary(); + public Dictionary ExternalQueryParams = new Dictionary(); } internal class CancelReceiveMessagesInvocation : ReceiveMessagesInvocation, Core.IEffectCancelInvocation { } internal class CancelHandshakeInvocation : HandshakeInvocation, Core.IEffectCancelInvocation { } - internal class HandshakeReconnectInvocation: Core.IEffectInvocation - { - public IEnumerable Channels; - public IEnumerable ChannelGroups; + internal class HandshakeReconnectInvocation: HandshakeInvocation + { + public int AttemptedRetries; + public int MaxConnectionRetry; + public PNReconnectionPolicy Policy; } internal class CancelHandshakeReconnectInvocation: HandshakeReconnectInvocation, Core.IEffectCancelInvocation { } - internal class ReceiveReconnectInvocation: Core.IEffectInvocation + internal class ReceiveReconnectInvocation: ReceiveMessagesInvocation { - public IEnumerable Channels; - public IEnumerable ChannelGroups; - public SubscriptionCursor Cursor; + public int AttemptedRetries; + public int MaxConnectionRetry; + public PNReconnectionPolicy Policy; } internal class CancelReceiveReconnectInvocation: ReceiveReconnectInvocation, Core.IEffectCancelInvocation { } diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeFailedState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeFailedState.cs index 0ce533158..a09f05a6e 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeFailedState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeFailedState.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class HandshakeFailedState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeReconnectingState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeReconnectingState.cs index a9dff98e3..74f4848e4 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeReconnectingState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeReconnectingState.cs @@ -1,17 +1,20 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class HandshakeReconnectingState : Core.State { public IEnumerable Channels; public IEnumerable ChannelGroups; + public PNReconnectionPolicy RetryPolicy; + public int MaxConnectionRetry; + public int AttemptedRetries; public override IEnumerable OnEntry => new HandshakeReconnectInvocation() - { Channels = this.Channels, ChannelGroups = this.ChannelGroups }.AsArray(); + { Channels = this.Channels, ChannelGroups = this.ChannelGroups, Policy = this.RetryPolicy, MaxConnectionRetry = this.MaxConnectionRetry, AttemptedRetries = this.AttemptedRetries }.AsArray(); public override IEnumerable OnExit { get; } = new CancelHandshakeReconnectInvocation().AsArray(); public override TransitionResult Transition(IEvent e) diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeStoppedState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeStoppedState.cs index fc69db008..95c0cd190 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeStoppedState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakeStoppedState.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class HandshakeStoppedState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakingState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakingState.cs index 9d3b50e18..14b610f8b 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakingState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/HandshakingState.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class HandshakingState : Core.State { @@ -35,7 +35,7 @@ public override TransitionResult Transition(IEvent e) Cursor = subscriptionRestored.Cursor }, - Events.HandshakeFailureEvent handshakeFailure => new States.HandshakeFailedState() + Events.HandshakeFailureEvent handshakeFailure => new States.HandshakeReconnectingState() { Channels = this.Channels, ChannelGroups = this.ChannelGroups }.With(new EmitStatusInvocation(handshakeFailure.Status)), diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveFailedState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveFailedState.cs index f6fc39ca4..69139e66f 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveFailedState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveFailedState.cs @@ -1,9 +1,10 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class ReceiveFailedState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveReconnectingState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveReconnectingState.cs index d0eac310f..60c7f1f84 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveReconnectingState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveReconnectingState.cs @@ -1,9 +1,10 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class ReceiveReconnectingState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveStoppedState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveStoppedState.cs index bec729a2b..9407f0ca2 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveStoppedState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceiveStoppedState.cs @@ -1,9 +1,10 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class ReceiveStoppedState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceivingState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceivingState.cs index d94a67d9f..7b9991ef8 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceivingState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/ReceivingState.cs @@ -1,9 +1,10 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Subscribe.Common; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class ReceivingState : Core.State { diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/States/UnsubscribedState.cs b/src/Api/PubnubApi/EventEngine/Subscribe/States/UnsubscribedState.cs index 12dd75edd..f1e9fc683 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/States/UnsubscribedState.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/States/UnsubscribedState.cs @@ -1,15 +1,12 @@ using System; using System.Collections.Generic; -using PubnubApi.PubnubEventEngine.Core; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe.States +namespace PubnubApi.EventEngine.Subscribe.States { internal class UnsubscribedState : Core.State { - public override IEnumerable OnEntry { get; } = null; - public override IEnumerable OnExit { get; } = null; - public override TransitionResult Transition(Core.IEvent e) { return e switch diff --git a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs index f71bd366f..862fb5819 100644 --- a/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs +++ b/src/Api/PubnubApi/EventEngine/Subscribe/SubscribeEventEngine.cs @@ -1,10 +1,11 @@ using PubnubApi.EndPoint; -using PubnubApi.PubnubEventEngine.Subscribe.Effects; -using PubnubApi.PubnubEventEngine.Subscribe.Invocations; -using PubnubApi.PubnubEventEngine.Subscribe.States; +using PubnubApi.EventEngine.Core; +using PubnubApi.EventEngine.Subscribe.States; +using PubnubApi.EventEngine.Subscribe.Effects; +using PubnubApi.EventEngine.Subscribe.Invocations; -namespace PubnubApi.PubnubEventEngine.Subscribe { - internal class SubscribeEventEngine : PubnubEventEngine.Core.Engine { +namespace PubnubApi.EventEngine.Subscribe { + internal class SubscribeEventEngine : Engine { private SubscribeManager2 subscribeManager; public SubscribeEventEngine(SubscribeManager2 subscribeManager) { @@ -13,8 +14,14 @@ public SubscribeEventEngine(SubscribeManager2 subscribeManager) { // initialize the handler, pass dependencies var handshakeHandler = new Effects.HandshakeEffectHandler(subscribeManager, eventQueue); dispatcher.Register(handshakeHandler); + dispatcher.Register(handshakeHandler); dispatcher.Register(handshakeHandler); + var receiveHandler = new Effects.ReceivingEffectHandler(subscribeManager, eventQueue); + dispatcher.Register(receiveHandler); + dispatcher.Register(receiveHandler); + dispatcher.Register(receiveHandler); + currentState = new UnsubscribedState(); } } diff --git a/src/Api/PubnubApi/Model/Consumer/PNStatus.cs b/src/Api/PubnubApi/Model/Consumer/PNStatus.cs index 5abb9de27..4e3e3a5ff 100644 --- a/src/Api/PubnubApi/Model/Consumer/PNStatus.cs +++ b/src/Api/PubnubApi/Model/Consumer/PNStatus.cs @@ -14,6 +14,16 @@ public class PNStatus public PNStatus() { } + public PNStatus(Exception e, PNOperationType operationType, PNStatusCategory category, IEnumerable affectedChannels = null, IEnumerable affectedChannelGroups = null) + { + this.Error = true; + this.Operation = operationType; + this.ErrorData = new PNErrorData(e.Message, e); + this.AffectedChannels = affectedChannels?.ToList(); + this.AffectedChannelGroups = affectedChannelGroups?.ToList(); + this.Category = category; + } + internal PNStatus(object endpointOperation) { this.savedEndpointOperation = endpointOperation; diff --git a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj index 4527aad73..356a8f73b 100644 --- a/src/Api/PubnubApiPCL/PubnubApiPCL.csproj +++ b/src/Api/PubnubApiPCL/PubnubApiPCL.csproj @@ -213,6 +213,7 @@ Enum\ResponseType.cs + @@ -228,7 +229,10 @@ + + + @@ -917,9 +921,12 @@ + + + diff --git a/src/Api/PubnubApiUWP/PubnubApiUWP.csproj b/src/Api/PubnubApiUWP/PubnubApiUWP.csproj index 0b0b9c5d0..00800be1c 100644 --- a/src/Api/PubnubApiUWP/PubnubApiUWP.csproj +++ b/src/Api/PubnubApiUWP/PubnubApiUWP.csproj @@ -330,6 +330,7 @@ Enum\ResponseType.cs + @@ -345,7 +346,10 @@ + + + @@ -726,9 +730,12 @@ + + + diff --git a/src/UnitTests/AcceptanceTests/Features/grant-token.feature.cs b/src/UnitTests/AcceptanceTests/Features/grant-token.feature.cs index a1aaaf549..c8bb56301 100644 --- a/src/UnitTests/AcceptanceTests/Features/grant-token.feature.cs +++ b/src/UnitTests/AcceptanceTests/Features/grant-token.feature.cs @@ -37,9 +37,9 @@ public partial class GrantAnAccessTokenFeature public virtual void FeatureSetup() { testRunner = TechTalk.SpecFlow.TestRunnerManager.GetTestRunner(); - TechTalk.SpecFlow.FeatureInfo featureInfo = new TechTalk.SpecFlow.FeatureInfo(new System.Globalization.CultureInfo("en-US"), "Features", "Grant an access token", " As a PubNub customer I want to restrict and allow access to\r\n specific PubNub " + - "resources (channels, channel groups, uuids)\r\n by my user base (both people and " + - "devices) which are each\r\n identified by a unique UUID.", ProgrammingLanguage.CSharp, featureTags); + TechTalk.SpecFlow.FeatureInfo featureInfo = new TechTalk.SpecFlow.FeatureInfo(new System.Globalization.CultureInfo("en-US"), "Features", "Grant an access token", " As a PubNub customer I want to restrict and allow access to\n specific PubNub r" + + "esources (channels, channel groups, uuids)\n by my user base (both people and de" + + "vices) which are each\n identified by a unique UUID.", ProgrammingLanguage.CSharp, featureTags); testRunner.OnFeatureStart(featureInfo); } diff --git a/src/UnitTests/AcceptanceTests/Features/revoke-token.feature.cs b/src/UnitTests/AcceptanceTests/Features/revoke-token.feature.cs index d5a85c045..f98f833b4 100644 --- a/src/UnitTests/AcceptanceTests/Features/revoke-token.feature.cs +++ b/src/UnitTests/AcceptanceTests/Features/revoke-token.feature.cs @@ -39,8 +39,8 @@ public partial class RevokeAnAccessTokenFeature public virtual void FeatureSetup() { testRunner = TechTalk.SpecFlow.TestRunnerManager.GetTestRunner(); - TechTalk.SpecFlow.FeatureInfo featureInfo = new TechTalk.SpecFlow.FeatureInfo(new System.Globalization.CultureInfo("en-US"), "Features", "Revoke an access token", " As a PubNub customer I want to withdraw existing permission for\r\n specific Pub" + - "Nub resources by revoking corresponding tokens.", ProgrammingLanguage.CSharp, featureTags); + TechTalk.SpecFlow.FeatureInfo featureInfo = new TechTalk.SpecFlow.FeatureInfo(new System.Globalization.CultureInfo("en-US"), "Features", "Revoke an access token", " As a PubNub customer I want to withdraw existing permission for\n specific PubN" + + "ub resources by revoking corresponding tokens.", ProgrammingLanguage.CSharp, featureTags); testRunner.OnFeatureStart(featureInfo); }