Skip to content
This repository was archived by the owner on Oct 28, 2021. It is now read-only.

Commit 87c8327

Browse files
committed
Break out common functionality of DatafeedApi and FirehoseApi into
a superclass.
1 parent 871690b commit 87c8327

File tree

4 files changed

+158
-193
lines changed

4 files changed

+158
-193
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Licensed to the Symphony Software Foundation (SSF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SSF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
namespace SymphonyOSS.RestApiClient.Api.AgentApi
19+
{
20+
using System;
21+
using System.Collections.Generic;
22+
using System.Diagnostics;
23+
using System.Threading.Tasks;
24+
using Authentication;
25+
using Generated.Json;
26+
using Generated.OpenApi.AgentApi.Model;
27+
28+
/// <summary>
29+
/// Abstract superclass for datafeed-type Apis, eg <see cref="Generated.OpenApi.AgentApi.Api.DatafeedApi"/>
30+
/// and <see cref="Generated.OpenApi.AgentApi.Api.FirehoseApi"/>.
31+
/// </summary>
32+
public abstract class AbstractDatafeedApi
33+
{
34+
private static readonly TraceSource TraceSource = new TraceSource("SymphonyOSS.RestApiClient");
35+
36+
protected readonly IAuthTokens AuthTokens;
37+
38+
protected readonly IApiExecutor ApiExecutor;
39+
40+
protected volatile bool ShouldStop;
41+
42+
private readonly Dictionary<EventHandler<MessageEventArgs>, Task> _tasks = new Dictionary<EventHandler<MessageEventArgs>, Task>();
43+
44+
static AbstractDatafeedApi()
45+
{
46+
JsonSubtypeConverter.Register(typeof(V2Message));
47+
}
48+
49+
/// <summary>
50+
/// Initializes a new instance of the <see cref="DatafeedApi" /> class.
51+
/// See <see cref="Factories.AgentApiFactory"/> for conveniently constructing
52+
/// an instance.
53+
/// </summary>
54+
/// <param name="authTokens">Authentication tokens.</param>
55+
/// <param name="apiExecutor">Execution strategy.</param>
56+
protected AbstractDatafeedApi(IAuthTokens authTokens, IApiExecutor apiExecutor)
57+
{
58+
AuthTokens = authTokens;
59+
ApiExecutor = apiExecutor;
60+
}
61+
62+
private event EventHandler<MessageEventArgs> _onMessage;
63+
public event EventHandler<MessageEventArgs> OnMessage
64+
{
65+
add
66+
{
67+
_onMessage += value;
68+
}
69+
remove
70+
{
71+
_onMessage -= value;
72+
lock (_tasks)
73+
{
74+
_tasks.Remove(value);
75+
}
76+
}
77+
}
78+
79+
/// <summary>
80+
/// Requests that <see cref="Listen"/> should stop blocking and return control
81+
/// to the calling thread. Calling <see cref="Stop"/> will not immediately return
82+
/// control, but wait for the current outstanding request to complete.
83+
/// </summary>
84+
public void Stop()
85+
{
86+
ShouldStop = true;
87+
}
88+
89+
protected void ProcessMessageList(V2MessageList messageList)
90+
{
91+
if (messageList == null || _onMessage == null)
92+
{
93+
return;
94+
}
95+
96+
foreach (var eventHandler in _onMessage.GetInvocationList())
97+
{
98+
NotifyAsync((EventHandler<MessageEventArgs>)eventHandler, messageList);
99+
}
100+
}
101+
102+
protected async void NotifyAsync(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
103+
{
104+
// Notify each handler in a separate task, maintaining the order of messages in the list, and
105+
// get back to reading the data feed again without waiting for listeners to process messages.
106+
Task task;
107+
if (_tasks.TryGetValue(messageEventHandler, out task))
108+
{
109+
await task;
110+
}
111+
_tasks[messageEventHandler] = Task.Run(() => Notify(messageEventHandler, messageList));
112+
}
113+
114+
private void Notify(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
115+
{
116+
foreach (var message in messageList)
117+
{
118+
TraceSource.TraceEvent(
119+
TraceEventType.Verbose, 0,
120+
"Notifying listener about message with ID \"{0}\" in stream \"{1}\"",
121+
(message as V2Message)?.Id, (message as V2Message)?.StreamId);
122+
try
123+
{
124+
messageEventHandler.Invoke(this, new MessageEventArgs(message));
125+
}
126+
catch (Exception e)
127+
{
128+
TraceSource.TraceEvent(
129+
TraceEventType.Error, 0,
130+
"Unhandled exception caught when notifying listener about message with ID \"{0}\": {1}",
131+
message.Id, e);
132+
}
133+
}
134+
}
135+
}
136+
}

src/SymphonyOSS.RestApiClient/Api/AgentApi/DatafeedApi.cs

Lines changed: 9 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717

1818
namespace SymphonyOSS.RestApiClient.Api.AgentApi
1919
{
20-
using System;
21-
using System.Collections.Generic;
2220
using System.Diagnostics;
23-
using System.Threading.Tasks;
2421
using Authentication;
25-
using Generated.Json;
2622
using Generated.OpenApi.AgentApi.Client;
2723
using Generated.OpenApi.AgentApi.Model;
2824

@@ -31,25 +27,12 @@ namespace SymphonyOSS.RestApiClient.Api.AgentApi
3127
/// Encapsulates <see cref="Generated.OpenApi.AgentApi.Api.DatafeedApi"/>,
3228
/// adding authentication token management and a custom execution strategy.
3329
/// </summary>
34-
public class DatafeedApi
30+
public class DatafeedApi : AbstractDatafeedApi
3531
{
3632
private static readonly TraceSource TraceSource = new TraceSource("SymphonyOSS.RestApiClient");
3733

3834
private readonly Generated.OpenApi.AgentApi.Api.IDatafeedApi _datafeedApi;
3935

40-
private readonly IAuthTokens _authTokens;
41-
42-
private readonly IApiExecutor _apiExecutor;
43-
44-
private readonly Dictionary<EventHandler<MessageEventArgs>, Task> _tasks = new Dictionary<EventHandler<MessageEventArgs>, Task>();
45-
46-
private volatile bool _shouldStop;
47-
48-
static DatafeedApi()
49-
{
50-
JsonSubtypeConverter.Register(typeof(V2Message));
51-
}
52-
5336
/// <summary>
5437
/// Initializes a new instance of the <see cref="DatafeedApi" /> class.
5538
/// See <see cref="Factories.AgentApiFactory"/> for conveniently constructing
@@ -58,111 +41,40 @@ static DatafeedApi()
5841
/// <param name="authTokens">Authentication tokens.</param>
5942
/// <param name="configuration">Api configuration.</param>
6043
/// <param name="apiExecutor">Execution strategy.</param>
61-
public DatafeedApi(IAuthTokens authTokens, Configuration configuration, IApiExecutor apiExecutor)
44+
public DatafeedApi(IAuthTokens authTokens, Configuration configuration, IApiExecutor apiExecutor) : base(authTokens, apiExecutor)
6245
{
6346
_datafeedApi = new Generated.OpenApi.AgentApi.Api.DatafeedApi(configuration);
64-
_authTokens = authTokens;
65-
_apiExecutor = apiExecutor;
66-
}
67-
68-
private event EventHandler<MessageEventArgs> _onMessage;
69-
public event EventHandler<MessageEventArgs> OnMessage
70-
{
71-
add
72-
{
73-
_onMessage += value;
74-
}
75-
remove
76-
{
77-
_onMessage -= value;
78-
lock (_tasks)
79-
{
80-
_tasks.Remove(value);
81-
}
82-
}
8347
}
8448

8549
/// <summary>
8650
/// Starts listening, notifying event handlers about incoming messages. Blocks
87-
/// until <see cref="Stop"/> is invoked.
51+
/// until <see cref="AbstractDatafeedApi.Stop"/> is invoked.
8852
/// </summary>
8953
public void Listen()
9054
{
91-
_shouldStop = false;
55+
ShouldStop = false;
9256
var datafeed = CreateDatafeed();
93-
while (!_shouldStop)
57+
while (!ShouldStop)
9458
{
9559
var messageList = ReadDatafeed(ref datafeed);
96-
if (_shouldStop)
60+
if (ShouldStop)
9761
{
9862
// Don't process messages if the user has stopped listening.
9963
break;
10064
}
10165

102-
if (messageList == null || _onMessage == null)
103-
{
104-
continue;
105-
}
106-
107-
foreach (var eventHandler in _onMessage.GetInvocationList())
108-
{
109-
NotifyAsync((EventHandler<MessageEventArgs>)eventHandler, messageList);
110-
}
111-
}
112-
}
113-
114-
/// <summary>
115-
/// Requests that <see cref="Listen"/> should stop blocking and return control
116-
/// to the calling thread. Calling <see cref="Stop"/> will not immediately return
117-
/// control, but wait for the current outstanding request to complete.
118-
/// </summary>
119-
public void Stop()
120-
{
121-
_shouldStop = true;
122-
}
123-
124-
private async void NotifyAsync(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
125-
{
126-
// Notify each handler in a separate task, maintaining the order of messages in the list, and
127-
// get back to reading the data feed again without waiting for listeners to process messages.
128-
Task task;
129-
if (_tasks.TryGetValue(messageEventHandler, out task))
130-
{
131-
await task;
132-
}
133-
_tasks[messageEventHandler] = Task.Run(() => Notify(messageEventHandler, messageList));
134-
}
135-
136-
private void Notify(EventHandler<MessageEventArgs> messageEventHandler, V2MessageList messageList)
137-
{
138-
foreach (var message in messageList)
139-
{
140-
TraceSource.TraceEvent(
141-
TraceEventType.Verbose, 0,
142-
"Notifying listener about message with ID \"{0}\" in stream \"{1}\"",
143-
(message as V2Message)?.Id, (message as V2Message)?.StreamId);
144-
try
145-
{
146-
messageEventHandler.Invoke(this, new MessageEventArgs(message));
147-
}
148-
catch (Exception e)
149-
{
150-
TraceSource.TraceEvent(
151-
TraceEventType.Error, 0,
152-
"Unhandled exception caught when notifying listener about message with ID \"{0}\": {1}",
153-
message.Id, e);
154-
}
66+
ProcessMessageList(messageList);
15567
}
15668
}
15769

15870
private Datafeed CreateDatafeed()
15971
{
160-
return _apiExecutor.Execute(_datafeedApi.V1DatafeedCreatePost, _authTokens.SessionToken, _authTokens.KeyManagerToken);
72+
return ApiExecutor.Execute(_datafeedApi.V1DatafeedCreatePost, AuthTokens.SessionToken, AuthTokens.KeyManagerToken);
16173
}
16274

16375
private V2MessageList ReadDatafeed(string id, int? maxMessages = null)
16476
{
165-
return _apiExecutor.Execute(_datafeedApi.V2DatafeedIdReadGet, id, _authTokens.SessionToken, _authTokens.KeyManagerToken, maxMessages);
77+
return ApiExecutor.Execute(_datafeedApi.V2DatafeedIdReadGet, id, AuthTokens.SessionToken, AuthTokens.KeyManagerToken, maxMessages);
16678
}
16779

16880
private V2MessageList ReadDatafeed(ref Datafeed datafeed, int? maxMessages = null)

0 commit comments

Comments
 (0)