Skip to content

Commit 6fa332c

Browse files
MikeDobrzanmohitpubnubbudgetpreneur
authored
Eventengine/handshake handler (#181)
* feat: simplified abstraction * feat: handshake effect handler * fix: initial retry implementation * handshake reconnect * fix: handshake handler * revert *.csproj changes * added required info in handshake reconnect invocation. * receive handler (#180) * receive handler * fix: background handlers * *reconnection configurations *refactor naming * passing configuration for reconnection * fix: transition values from receive reconnect * wip: receive messages * Revert "Merge remote-tracking branch 'origin/eventengine/handshake-handler' into eventengine/handshake-handler" This reverts commit be568f7, reversing changes made to 0da6c9a. * fix * ReceivingEffectHandler - ReceivingResponse * null check * SubscribeEventEngine - receiveHandler * *csproj file for RReceivingEffectHandler * EmitStatus effect handler - take1 * empty task * wip emit messages * wip emit messages * cleanup and unify convention for *emitters * missing changes emitmessage * Added publisher * emitmessages --------- Co-authored-by: Mohit Tejani <[email protected]> Co-authored-by: Pandu Masabathula <[email protected]>
1 parent 839dda5 commit 6fa332c

30 files changed

+618
-123
lines changed

src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class SubscribeOperation2<T>: ISubscribeOperation<T>
2626
private bool presenceSubscribeEnabled;
2727
private SubscribeManager2 manager;
2828
private Dictionary<string, object> queryParam;
29-
private EventEngine pnEventEngine;
29+
private PubnubEventEngine.EventEngine pnEventEngine;
3030
private Pubnub PubnubInstance;
3131
public List<SubscribeCallback> SubscribeListenerList
3232
{
@@ -104,18 +104,18 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
104104
effectDispatcher.Register(EventType.ReceiveReconnectSuccess, receiveReconnectEffectHandler);
105105
effectDispatcher.Register(EventType.ReceiveReconnectGiveUp, receiveReconnectEffectHandler);
106106

107-
pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
108-
pnEventEngine.PubnubUnitTest = unit;
109-
pnEventEngine.Setup<T>(config);
110-
111-
if (pnEventEngine.PubnubUnitTest != null)
112-
{
113-
pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
114-
}
115-
else
116-
{
117-
pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
118-
}
107+
// pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
108+
// pnEventEngine.PubnubUnitTest = unit;
109+
// pnEventEngine.Setup<T>(config);
110+
111+
// if (pnEventEngine.PubnubUnitTest != null)
112+
// {
113+
// pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
114+
// }
115+
// else
116+
// {
117+
//pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
118+
// }
119119
}
120120

121121
private void ReceivingEffect_ReceiveRequested(object sender, ReceiveRequestEventArgs e)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace PubnubApi.EventEngine.Common
5+
{
6+
public class Delay
7+
{
8+
public bool Cancelled { get; private set; } = false;
9+
private readonly TaskCompletionSource<object> taskCompletionSource = new TaskCompletionSource<object>();
10+
private readonly object monitor = new object();
11+
private readonly int milliseconds;
12+
13+
public Delay(int milliseconds)
14+
{
15+
this.milliseconds = milliseconds;
16+
}
17+
18+
public Task Start()
19+
{
20+
#if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12
21+
Task taskAwaiter = Task.Factory.StartNew(AwaiterLoop);
22+
taskAwaiter.Wait();
23+
#else
24+
Thread awaiterThread = new Thread(AwaiterLoop);
25+
awaiterThread.Start();
26+
#endif
27+
return taskCompletionSource.Task; }
28+
29+
public void Cancel()
30+
{
31+
lock (monitor)
32+
{
33+
Cancelled = true;
34+
Monitor.Pulse(monitor);
35+
}
36+
}
37+
38+
private void AwaiterLoop()
39+
{
40+
while(true)
41+
{
42+
lock (monitor)
43+
{
44+
if (Cancelled)
45+
{
46+
taskCompletionSource.SetCanceled();
47+
break;
48+
}
49+
Monitor.Wait(monitor, milliseconds);
50+
if (Cancelled)
51+
{
52+
taskCompletionSource.SetCanceled();
53+
break;
54+
}
55+
taskCompletionSource.SetResult(null);
56+
}
57+
}
58+
}
59+
}
60+
}

src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Collections.Generic;
33
using System.Threading.Tasks;
44

5-
namespace PubnubApi.PubnubEventEngine.Core {
5+
namespace PubnubApi.EventEngine.Core {
66
internal class EffectDispatcher {
77
// assumes 1 instance of handler - capable of managing itself
88
private readonly Dictionary<System.Type, IEffectHandler> effectInvocationHandlerMap =
@@ -18,8 +18,13 @@ public async Task Dispatch<T>(T invocation) where T : IEffectInvocation {
1818

1919
if (invocation is IEffectCancelInvocation) {
2020
await effectInvocationHandlerMap[invocation.GetType()].Cancel();
21-
} else {
22-
await ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]).Run(invocation);
21+
} else
22+
{
23+
var handler = ((IEffectHandler<T>)effectInvocationHandlerMap[invocation.GetType()]);
24+
if (handler.IsBackground(invocation))
25+
handler.Run(invocation).Start();
26+
else
27+
await handler.Run(invocation);
2328
}
2429
}
2530

src/Api/PubnubApi/EventEngine/Core/Engine.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using System.Threading.Tasks;
22
using System.Collections.Generic;
33

4-
namespace PubnubApi.PubnubEventEngine.Core {
4+
namespace PubnubApi.EventEngine.Core {
55
internal abstract class Engine {
66
public EventQueue eventQueue = new EventQueue();
77

src/Api/PubnubApi/EventEngine/Core/EventEngineInterfaces.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Threading.Tasks;
33
using System.Collections.Generic;
44

5-
namespace PubnubApi.PubnubEventEngine.Core {
5+
namespace PubnubApi.EventEngine.Core {
66

77
/// <summary>
88
/// Generic effect handler.
@@ -17,6 +17,7 @@ internal interface IEffectHandler {
1717
/// <typeparam name="T">Associated invocation</typeparam>
1818
internal interface IEffectHandler<in T> : IEffectHandler where T : IEffectInvocation {
1919
Task Run(T invocation);
20+
bool IsBackground(T invocation);
2021
}
2122

2223
/// <summary>

src/Api/PubnubApi/EventEngine/Core/EventQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Linq;
33
using System.Threading.Tasks;
44

5-
namespace PubnubApi.PubnubEventEngine.Core
5+
namespace PubnubApi.EventEngine.Core
66
{
77
internal class EventQueue
88
{

src/Api/PubnubApi/EventEngine/Core/Utils.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.Threading.Tasks;
33
using System.Collections.Generic;
44

5-
namespace PubnubApi.PubnubEventEngine.Core
5+
namespace PubnubApi.EventEngine.Core
66
{
77
internal static class Utils
88
{
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using Newtonsoft.Json;
2+
3+
namespace PubnubApi.EventEngine.Subscribe.Common
4+
{
5+
public class SubscriptionCursor
6+
{
7+
public long? Timetoken { get; set; }
8+
public int? Region { get; set; }
9+
}
10+
11+
public class HandshakeResponse
12+
{
13+
[JsonProperty("t")]
14+
public Timetoken Timetoken { get; set; }
15+
16+
[JsonProperty("m")]
17+
public object[] Messages { get; set; }
18+
}
19+
public class HandshakeError
20+
{
21+
[JsonProperty("status")]
22+
public int Status { get; set; }
23+
24+
[JsonProperty("error")]
25+
public string ErrorMessage { get; set; }
26+
}
27+
28+
public class Timetoken
29+
{
30+
[JsonProperty("t")]
31+
public long Timestamp { get; set; }
32+
33+
[JsonProperty("r")]
34+
public int Region { get; set; }
35+
36+
}
37+
38+
public class ReceivingResponse<T>
39+
{
40+
[JsonProperty("t")]
41+
public Timetoken Timetoken { get; set; }
42+
43+
[JsonProperty("m")]
44+
public Message<T>[] Messages { get; set; }
45+
}
46+
47+
public class Message<T>
48+
{
49+
[JsonProperty ("a")]
50+
public string Shard { get; set;}
51+
52+
[JsonProperty ("b")]
53+
public string SubscriptionMatch { get; set;}
54+
55+
[JsonProperty("c")]
56+
public string Channel { get; set; }
57+
58+
[JsonProperty("d")]
59+
public T Payload { get; set; }
60+
61+
[JsonProperty("e")]
62+
public int MessageType { get; set; }
63+
64+
[JsonProperty("f")]
65+
public string Flags { get; set; }
66+
67+
[JsonProperty("i")]
68+
public string IssuingClientId { get; set; }
69+
70+
[JsonProperty("k")]
71+
public string SubscribeKey { get; set; }
72+
73+
[JsonProperty("o")]
74+
public object OriginatingTimetoken { get; set; }
75+
76+
[JsonProperty("p")]
77+
public object PublishMetadata { get; set; }
78+
79+
[JsonProperty("s")]
80+
public long SequenceNumber { get; set; }
81+
82+
[JsonProperty("p")]
83+
public Timetoken Timetoken { get; set; }
84+
}
85+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System;
2+
namespace PubnubApi.EventEngine.Subscribe.Context
3+
{
4+
public static class ReconnectionDelayUtil
5+
{
6+
public static int CalculateDelay(PNReconnectionPolicy policy, int attempts)
7+
{
8+
Random numGenerator = new Random();
9+
int delayValue = 0;
10+
int backoff = 5;
11+
switch (policy) {
12+
case PNReconnectionPolicy.LINEAR:
13+
delayValue = attempts * backoff + numGenerator.Next(1000);
14+
break;
15+
case PNReconnectionPolicy.EXPONENTIAL:
16+
delayValue = (int)(Math.Pow(2, attempts - 1) * 1000 + numGenerator.Next(1000));
17+
break;
18+
}
19+
return delayValue;
20+
21+
}
22+
23+
public static bool shouldRetry(PNReconnectionPolicy policy, int attempts, int maxAttempts)
24+
{
25+
if (policy == PNReconnectionPolicy.NONE) return false;
26+
return maxAttempts < attempts;
27+
}
28+
}
29+
}
30+
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Newtonsoft.Json;
5+
using PubnubApi.EventEngine.Core;
6+
using PubnubApi.EventEngine.Subscribe.Invocations;
7+
8+
namespace PubnubApi.EventEngine.Subscribe.Effects
9+
{
10+
internal class EmitMessagesHandler : IEffectHandler<Invocations.EmitMessagesInvocation>
11+
{
12+
private readonly System.Action<Pubnub, PNMessageResult<object>> messageEmitterFunction;
13+
private readonly Pubnub pubnubInstance;
14+
15+
public EmitMessagesHandler(Pubnub pubnubInstance,
16+
System.Action<Pubnub, PNMessageResult<object>> messageEmitterFunction)
17+
{
18+
this.messageEmitterFunction = messageEmitterFunction;
19+
this.pubnubInstance = pubnubInstance;
20+
}
21+
22+
public async Task Run(EmitMessagesInvocation invocation)
23+
{
24+
var processedMessages = invocation.Messages.Messages.Select(m => new PNMessageResult<object>()
25+
{
26+
Channel = m.Channel,
27+
Message = JsonConvert.DeserializeObject(m.Payload),
28+
Subscription = m.SubscriptionMatch,
29+
Timetoken = m.Timetoken.Timestamp,
30+
UserMetadata = m.PublishMetadata,
31+
Publisher = m.IssuingClientId
32+
});
33+
34+
foreach (var message in processedMessages)
35+
{
36+
messageEmitterFunction(pubnubInstance, message);
37+
}
38+
}
39+
40+
public bool IsBackground(EmitMessagesInvocation invocation) => false;
41+
42+
public Task Cancel()
43+
{
44+
throw new NotImplementedException();
45+
}
46+
}
47+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using PubnubApi.EventEngine.Core;
4+
using PubnubApi.EventEngine.Subscribe.Invocations;
5+
6+
namespace PubnubApi.EventEngine.Subscribe.Effects
7+
{
8+
public class EmitStatusEffectHandler: Core.IEffectHandler<EmitStatusInvocation>
9+
{
10+
private readonly Action<Pubnub, PNStatus> statusEmitterFunction;
11+
private readonly Pubnub pubnubInstance;
12+
13+
public EmitStatusEffectHandler(Pubnub pn, Action<Pubnub, PNStatus> statusEmitter)
14+
{
15+
this.statusEmitterFunction = statusEmitter;
16+
this.pubnubInstance = pn;
17+
}
18+
19+
public Task Cancel() => Utils.EmptyTask;
20+
21+
bool IEffectHandler<EmitStatusInvocation>.IsBackground(EmitStatusInvocation invocation) => false;
22+
23+
Task IEffectHandler<EmitStatusInvocation>.Run(EmitStatusInvocation invocation)
24+
{
25+
this.statusEmitterFunction(this.pubnubInstance, invocation.Status);
26+
return Utils.EmptyTask;
27+
}
28+
}
29+
}
30+

0 commit comments

Comments
 (0)