Skip to content

Commit 4bff839

Browse files
receive handler (#180)
* receive handler
1 parent 7748a47 commit 4bff839

File tree

8 files changed

+198
-21
lines changed

8 files changed

+198
-21
lines changed

src/Api/PubnubApi/EndPoint/PubSub/SubscribeOperation2.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class SubscribeOperation2<T>: ISubscribeOperation<T>
2626
private bool presenceSubscribeEnabled;
2727
private SubscribeManager2 manager;
2828
private Dictionary<string, object> queryParam;
29-
private EventEngine pnEventEngine;
29+
private PubnubEventEngine.EventEngine pnEventEngine;
3030
private Pubnub PubnubInstance;
3131
public List<SubscribeCallback> SubscribeListenerList
3232
{
@@ -104,18 +104,18 @@ public SubscribeOperation2(PNConfiguration pubnubConfig, IJsonPluggableLibrary j
104104
effectDispatcher.Register(EventType.ReceiveReconnectSuccess, receiveReconnectEffectHandler);
105105
effectDispatcher.Register(EventType.ReceiveReconnectGiveUp, receiveReconnectEffectHandler);
106106

107-
pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
108-
pnEventEngine.PubnubUnitTest = unit;
109-
pnEventEngine.Setup<T>(config);
110-
111-
if (pnEventEngine.PubnubUnitTest != null)
112-
{
113-
pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
114-
}
115-
else
116-
{
117-
pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
118-
}
107+
// pnEventEngine = new EventEngine(effectDispatcher, eventEmitter);
108+
// pnEventEngine.PubnubUnitTest = unit;
109+
// pnEventEngine.Setup<T>(config);
110+
111+
// if (pnEventEngine.PubnubUnitTest != null)
112+
// {
113+
// pnEventEngine.PubnubUnitTest.EventTypeList = new List<KeyValuePair<string, string>>();
114+
// }
115+
// else
116+
// {
117+
//pnEventEngine.InitialState(new State(StateType.Unsubscribed) { EventType = EventType.SubscriptionChanged });
118+
// }
119119
}
120120

121121
private void ReceivingEffect_ReceiveRequested(object sender, ReceiveRequestEventArgs e)

src/Api/PubnubApi/EventEngine/Common/Delay.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@ namespace PubnubApi.EventEngine.Common
66
public class Delay
77
{
88
public bool Cancelled { get; private set; } = false;
9-
private readonly Thread awaiterThread;
109
private readonly TaskCompletionSource<object> taskCompletionSource = new TaskCompletionSource<object>();
1110
private readonly object monitor = new object();
1211
private readonly int milliseconds;
1312

1413
public Delay(int milliseconds)
1514
{
1615
this.milliseconds = milliseconds;
17-
awaiterThread = new Thread(AwaiterLoop);
1816
}
1917

2018
public Task Start()
2119
{
20+
#if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12
21+
Task taskAwaiter = Task.Factory.StartNew(AwaiterLoop);
22+
taskAwaiter.Wait();
23+
#else
24+
Thread awaiterThread = new Thread(AwaiterLoop);
2225
awaiterThread.Start();
23-
return taskCompletionSource.Task;
24-
}
26+
#endif
27+
return taskCompletionSource.Task; }
2528

2629
public void Cancel()
2730
{

src/Api/PubnubApi/EventEngine/Subscribe/Common/CommonSubscribeTypes.cs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,49 @@ public class Timetoken
3434
public int? Region { get; set; }
3535

3636
}
37+
38+
public class ReceivingResponse<T>
39+
{
40+
[JsonProperty("t")]
41+
public Timetoken Timetoken { get; set; }
42+
43+
[JsonProperty("m")]
44+
public Message<T>[] Messages { get; set; }
45+
}
46+
47+
public class Message<T>
48+
{
49+
[JsonProperty ("a")]
50+
public string Shard { get; set;}
51+
52+
[JsonProperty ("b")]
53+
public string SubscriptionMatch { get; set;}
54+
55+
[JsonProperty("c")]
56+
public string Channel { get; set; }
57+
58+
[JsonProperty("d")]
59+
public T Payload { get; set; }
60+
61+
[JsonProperty("e")]
62+
public int MessageType { get; set; }
63+
64+
[JsonProperty("f")]
65+
public string Flags { get; set; }
66+
67+
//[JsonProperty("i")]
68+
//public string IssuingClientId { get; set; }
69+
70+
[JsonProperty("k")]
71+
public string SubscribeKey { get; set; }
72+
73+
[JsonProperty("o")]
74+
public object OriginatingTimetoken { get; set; }
75+
76+
[JsonProperty("p")]
77+
public object PublishMetadata { get; set; }
78+
79+
[JsonProperty("s")]
80+
public long SequenceNumber { get; set; }
81+
}
3782
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Newtonsoft.Json;
7+
using PubnubApi.EndPoint;
8+
using PubnubApi.EventEngine.Common;
9+
using PubnubApi.EventEngine.Core;
10+
using PubnubApi.EventEngine.Subscribe.Context;
11+
using PubnubApi.EventEngine.Subscribe.Events;
12+
using PubnubApi.EventEngine.Subscribe.Invocations;
13+
using PubnubApi.EventEngine.Subscribe.Common;
14+
15+
namespace PubnubApi.EventEngine.Subscribe.Effects
16+
{
17+
internal class ReceivingEffectHandler:
18+
Core.IEffectHandler<ReceiveMessagesInvocation>,
19+
Core.IEffectHandler<ReceiveReconnectInvocation>
20+
{
21+
private SubscribeManager2 manager;
22+
private EventQueue eventQueue;
23+
24+
private Delay retryDelay = new Delay(0);
25+
26+
public ReceivingEffectHandler(SubscribeManager2 manager, EventQueue eventQueue)
27+
{
28+
this.manager = manager;
29+
this.eventQueue = eventQueue;
30+
}
31+
32+
public Task Run(ReceiveReconnectInvocation invocation)
33+
{
34+
if (!ReconnectionDelayUtil.shouldRetry(invocation.Policy, invocation.AttemptedRetries, invocation.MaxConnectionRetry))
35+
{
36+
eventQueue.Enqueue(new ReceiveReconnectGiveUpEvent() { Status = new PNStatus(PNStatusCategory.PNCancelledCategory) });
37+
}
38+
else
39+
{
40+
retryDelay = new Delay(ReconnectionDelayUtil.CalculateDelay(invocation.Policy, invocation.AttemptedRetries));
41+
// Run in the background
42+
retryDelay.Start().ContinueWith((_) => this.Run((ReceiveMessagesInvocation)invocation));
43+
}
44+
45+
return Utils.EmptyTask;
46+
}
47+
48+
public async Task Run(ReceiveMessagesInvocation invocation)
49+
{
50+
var response = await MakeReceiveMessagesRequest(invocation);
51+
52+
switch (invocation)
53+
{
54+
case Invocations.ReceiveReconnectInvocation reconnectInvocation when response.Item2.Error:
55+
eventQueue.Enqueue(new Events.ReceiveReconnectFailureEvent() { AttemptedRetries = reconnectInvocation.AttemptedRetries + 1, Status = response.Item2});
56+
break;
57+
case Invocations.ReceiveReconnectInvocation reconnectInvocation:
58+
eventQueue.Enqueue(new Events.ReceiveReconnectSuccessEvent() { Cursor = response.Item1, Status = response.Item2 });
59+
break;
60+
case { } when response.Item2.Error:
61+
eventQueue.Enqueue(new Events.ReceiveFailureEvent() { Cursor = response.Item1, Status = response.Item2});
62+
break;
63+
case { }:
64+
//TODO: get messages
65+
List<PNMessageResult<object>> listOfMessages = null;
66+
eventQueue.Enqueue(new Events.ReceiveSuccessEvent() { Cursor = response.Item1, Messages= listOfMessages, Status = response.Item2 });
67+
break;
68+
69+
}
70+
}
71+
72+
private async Task<System.Tuple<SubscriptionCursor, PNStatus>> MakeReceiveMessagesRequest(ReceiveMessagesInvocation invocation)
73+
{
74+
var resp = await manager.ReceiveRequest<string>(
75+
PNOperationType.PNSubscribeOperation,
76+
invocation.Channels.ToArray(),
77+
invocation.ChannelGroups.ToArray(),
78+
invocation.Cursor.Timetoken.Value,
79+
invocation.Cursor.Region.Value,
80+
invocation.InitialSubscribeQueryParams,
81+
invocation.ExternalQueryParams
82+
);
83+
84+
try
85+
{
86+
//TODO: get ReceivingResponse from manager.ReceiveRequest
87+
var receiveResponse = JsonConvert.DeserializeObject<ReceivingResponse<string>>(resp.Item1);
88+
var c = new SubscriptionCursor()
89+
{
90+
Region = receiveResponse.Timetoken.Region,
91+
Timetoken = receiveResponse.Timetoken.Timestamp
92+
};
93+
return new System.Tuple<SubscriptionCursor, PNStatus>(c, resp.Item2);
94+
}
95+
catch (Exception e)
96+
{
97+
return new Tuple<SubscriptionCursor, PNStatus>(null, new PNStatus(e, PNOperationType.PNSubscribeOperation, PNStatusCategory.PNUnknownCategory, invocation.Channels, invocation.ChannelGroups));
98+
}
99+
}
100+
101+
public async Task Cancel()
102+
{
103+
if (!retryDelay.Cancelled)
104+
{
105+
retryDelay.Cancel();
106+
}
107+
else
108+
{
109+
manager.ReceiveRequestCancellation();
110+
}
111+
}
112+
}
113+
}

src/Api/PubnubApi/EventEngine/Subscribe/Events/SubscriptionEvents.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class ReceiveSuccessEvent : Core.IEvent {
4949

5050
public class ReceiveFailureEvent : Core.IEvent {
5151
public PNStatus Status;
52+
public int AttemptedRetries;
53+
public SubscriptionCursor Cursor;
5254
}
5355

5456
public class ReceiveReconnectRetry : Core.IEvent {

src/Api/PubnubApi/EventEngine/Subscribe/Invocations/SubscriptionInvocations.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ internal class ReceiveMessagesInvocation : Core.IEffectInvocation
4545
public IEnumerable<string> Channels;
4646
public IEnumerable<string> ChannelGroups;
4747
public SubscriptionCursor Cursor;
48+
public Dictionary<string, string> InitialSubscribeQueryParams = new Dictionary<string, string>();
49+
public Dictionary<string, object> ExternalQueryParams = new Dictionary<string, object>();
4850
}
4951

5052
internal class CancelReceiveMessagesInvocation : ReceiveMessagesInvocation, Core.IEffectCancelInvocation { }
@@ -60,11 +62,11 @@ internal class HandshakeReconnectInvocation: HandshakeInvocation
6062

6163
internal class CancelHandshakeReconnectInvocation: HandshakeReconnectInvocation, Core.IEffectCancelInvocation { }
6264

63-
internal class ReceiveReconnectInvocation: Core.IEffectInvocation
65+
internal class ReceiveReconnectInvocation: ReceiveMessagesInvocation
6466
{
65-
public IEnumerable<string> Channels;
66-
public IEnumerable<string> ChannelGroups;
67-
public SubscriptionCursor Cursor;
67+
public int AttemptedRetries;
68+
public int MaxConnectionRetry;
69+
public PNReconnectionPolicy Policy;
6870
}
6971

7072
internal class CancelReceiveReconnectInvocation: ReceiveReconnectInvocation, Core.IEffectCancelInvocation { }

src/Api/PubnubApiPCL/PubnubApiPCL.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
214214
<Compile Include="..\PubnubApi\Enum\ResponseType.cs">
215215
<Link>Enum\ResponseType.cs</Link>
216216
</Compile>
217+
<Compile Include="..\PubnubApi\EventEngine\Common\Delay.cs" Link="EventEngine\Common\Delay.cs" />
217218
<Compile Include="..\PubnubApi\EventEngine\Core\EffectDispatcher.cs" Link="EventEngine\Core\EffectDispatcher.cs" />
218219
<Compile Include="..\PubnubApi\EventEngine\Core\Engine.cs" Link="EventEngine\Core\Engine.cs" />
219220
<Compile Include="..\PubnubApi\EventEngine\Core\EventEngineInterfaces.cs" Link="EventEngine\Core\EventEngineInterfaces.cs" />
@@ -229,6 +230,8 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
229230
<Compile Include="..\PubnubApi\EventEngine\ReceiveReconnectingEffectHandler.cs" Link="EventEngine\ReceiveReconnectingEffectHandler.cs" />
230231
<Compile Include="..\PubnubApi\EventEngine\ReceivingEffectHandler.cs" Link="EventEngine\ReceivingEffectHandler.cs" />
231232
<Compile Include="..\PubnubApi\EventEngine\State.cs" Link="EventEngine\State.cs" />
233+
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Common\CommonSubscribeTypes.cs" Link="EventEngine\Subscribe\Common\CommonSubscribeTypes.cs" />
234+
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Context\ReconnectionDelayUtil.cs" Link="EventEngine\Subscribe\Context\ReconnectionDelayUtil.cs" />
232235
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Effects\HandshakeEffectHandler.cs" Link="EventEngine\Subscribe\Effects\HandshakeEffectHandler.cs" />
233236
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Events\SubscriptionEvents.cs" Link="EventEngine\Subscribe\Events\SubscriptionEvents.cs" />
234237
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Invocations\SubscriptionInvocations.cs" Link="EventEngine\Subscribe\Invocations\SubscriptionInvocations.cs" />
@@ -918,9 +921,12 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
918921
<Folder Include="EndPoint\Presence\" />
919922
<Folder Include="Enum\" />
920923
<Folder Include="EventEngine\Core\" />
924+
<Folder Include="EventEngine\Common\" />
921925
<Folder Include="EventEngine\Subscribe\Effects\" />
922926
<Folder Include="EventEngine\Subscribe\Events\" />
923927
<Folder Include="EventEngine\Subscribe\Invocations\" />
928+
<Folder Include="EventEngine\Subscribe\Common\" />
929+
<Folder Include="EventEngine\Subscribe\Context\" />
924930
<Folder Include="EventEngine\Subscribe\States\" />
925931
<Folder Include="HttpUtility\" />
926932
<Folder Include="Interface\" />

src/Api/PubnubApiUWP/PubnubApiUWP.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
331331
<Compile Include="..\PubnubApi\Enum\ResponseType.cs">
332332
<Link>Enum\ResponseType.cs</Link>
333333
</Compile>
334+
<Compile Include="..\PubnubApi\EventEngine\Common\Delay.cs" Link="EventEngine\Common\Delay.cs" />
334335
<Compile Include="..\PubnubApi\EventEngine\Core\EffectDispatcher.cs" Link="EventEngine\Core\EffectDispatcher.cs" />
335336
<Compile Include="..\PubnubApi\EventEngine\Core\Engine.cs" Link="EventEngine\Core\Engine.cs" />
336337
<Compile Include="..\PubnubApi\EventEngine\Core\EventEngineInterfaces.cs" Link="EventEngine\Core\EventEngineInterfaces.cs" />
@@ -346,6 +347,8 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
346347
<Compile Include="..\PubnubApi\EventEngine\ReceiveReconnectingEffectHandler.cs" Link="EventEngine\ReceiveReconnectingEffectHandler.cs" />
347348
<Compile Include="..\PubnubApi\EventEngine\ReceivingEffectHandler.cs" Link="EventEngine\ReceivingEffectHandler.cs" />
348349
<Compile Include="..\PubnubApi\EventEngine\State.cs" Link="EventEngine\State.cs" />
350+
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Common\CommonSubscribeTypes.cs" Link="EventEngine\Subscribe\Common\CommonSubscribeTypes.cs" />
351+
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Context\ReconnectionDelayUtil.cs" Link="EventEngine\Subscribe\Context\ReconnectionDelayUtil.cs" />
349352
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Effects\HandshakeEffectHandler.cs" Link="EventEngine\Subscribe\Effects\HandshakeEffectHandler.cs" />
350353
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Events\SubscriptionEvents.cs" Link="EventEngine\Subscribe\Events\SubscriptionEvents.cs" />
351354
<Compile Include="..\PubnubApi\EventEngine\Subscribe\Invocations\SubscriptionInvocations.cs" Link="EventEngine\Subscribe\Invocations\SubscriptionInvocations.cs" />
@@ -727,9 +730,12 @@ Addressed threading issue on reading ConcurrentDictionary keys.</PackageReleaseN
727730
<Folder Include="EndPoint\StoragePlayback\" />
728731
<Folder Include="EndPoint\Objects\" />
729732
<Folder Include="EventEngine\Core\" />
733+
<Folder Include="EventEngine\Common\" />
730734
<Folder Include="EventEngine\Subscribe\Effects\" />
731735
<Folder Include="EventEngine\Subscribe\Events\" />
732736
<Folder Include="EventEngine\Subscribe\Invocations\" />
737+
<Folder Include="EventEngine\Subscribe\Common\" />
738+
<Folder Include="EventEngine\Subscribe\Context\" />
733739
<Folder Include="EventEngine\Subscribe\States\" />
734740
<Folder Include="Model\Consumer\DeleteMessage\" />
735741
<Folder Include="Model\Consumer\Files\" />

0 commit comments

Comments
 (0)