Skip to content

Commit 7544725

Browse files
authored
feat: data change notify error handling (#11)
1 parent ded3c91 commit 7544725

File tree

3 files changed

+95
-55
lines changed

3 files changed

+95
-55
lines changed

src/Api/DataSynchronizer/DataSyncMessageHandler.cs

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
using System.Text;
21
using System.Text.Json;
3-
using System.Text.Json.Nodes;
42
using Api.Messaging;
53
using Api.Store;
6-
using Domain.Messages;
74
using Streaming.Protocol;
85

96
namespace Api.DataSynchronizer;
107

11-
public class DataSyncMessageHandler(IAgentStore agentStore, IDataChangeNotifier dataChangeNotifier)
8+
public class DataSyncMessageHandler(
9+
IAgentStore agentStore,
10+
IDataChangeNotifier dataChangeNotifier,
11+
ILogger<DataSyncMessageHandler> logger)
1212
: IDataSyncMessageHandler
1313
{
1414
public ValueTask<byte[]> GetRequestPayloadAsync() => agentStore.GetDataSyncPayloadAsync();
@@ -31,66 +31,36 @@ public async Task HandleAsync(JsonElement message)
3131
.ToArray();
3232
await agentStore.UpdateAsync(items);
3333

34-
await OnItemsUpdated(items);
34+
await SafeNotifyAsync(items);
3535
break;
3636
}
3737
case DataSyncEventTypes.Patch:
3838
{
3939
var patchDataSet = PatchDataSet.FromJson(data);
4040
await agentStore.UpdateAsync(patchDataSet.Items);
4141

42-
await OnItemsUpdated(patchDataSet.Items);
42+
await SafeNotifyAsync(patchDataSet.Items);
4343
break;
4444
}
4545
}
4646
}
4747

48-
private async Task OnItemsUpdated(StoreItem[] items)
48+
private async Task SafeNotifyAsync(StoreItem[] items)
4949
{
50-
List<DataChangeMessage> dataChanges = [];
51-
5250
foreach (var item in items)
5351
{
54-
await AddDataChange(item);
55-
}
56-
57-
await dataChangeNotifier.NotifyAsync(dataChanges.ToArray());
58-
59-
return;
60-
61-
async Task AddDataChange(StoreItem item)
62-
{
63-
if (item.Type == StoreItemType.Flag)
52+
try
6453
{
65-
var flagChange = new DataChangeMessage(
66-
Topics.FeatureFlagChange,
67-
item.Id,
68-
Encoding.UTF8.GetString(item.JsonBytes)
69-
);
70-
71-
dataChanges.Add(flagChange);
54+
await dataChangeNotifier.NotifyAsync(item);
7255
}
73-
74-
if (item.Type == StoreItemType.Segment)
56+
catch (Exception ex)
7557
{
76-
using var segment = JsonDocument.Parse(item.JsonBytes);
77-
78-
var envId = segment.RootElement.GetProperty("envId").GetGuid();
79-
var affectedIds = await agentStore.GetFlagReferencesAsync(envId, item.Id);
80-
81-
JsonObject payload = new()
82-
{
83-
["segment"] = JsonSerializer.SerializeToNode(segment),
84-
["affectedFlagIds"] = JsonSerializer.SerializeToNode(affectedIds)
85-
};
86-
87-
var segmentChange = new DataChangeMessage(
88-
Topics.SegmentChange,
58+
logger.LogError(
59+
ex,
60+
"Exception occurred while processing StoreItem Id {Id} of type {Type} for data change notification.",
8961
item.Id,
90-
JsonSerializer.Serialize(payload)
62+
item.Type
9163
);
92-
93-
dataChanges.Add(segmentChange);
9464
}
9565
}
9666
}

src/Api/Messaging/DataChangeNotifier.cs

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
using System.Text;
2+
using System.Text.Json;
3+
using System.Text.Json.Nodes;
4+
using Api.Store;
15
using Domain.Messages;
26
using Streaming.Connections;
37
using Streaming.Protocol;
@@ -9,17 +13,20 @@ public class DataChangeNotifier : IDataChangeNotifier
913
{
1014
private readonly IConnectionManager _connectionManager;
1115
private readonly IDataSyncService _dataSyncService;
16+
private readonly IAgentStore _agentStore;
1217
private readonly Dictionary<string, IMessageConsumer> _dataChangeHandlers;
1318
private readonly ILogger<DataChangeNotifier> _logger;
1419

1520
public DataChangeNotifier(
1621
IConnectionManager connectionManager,
1722
IDataSyncService dataSyncService,
23+
IAgentStore agentStore,
1824
IEnumerable<IMessageConsumer> messageHandlers,
1925
ILogger<DataChangeNotifier> logger)
2026
{
2127
_connectionManager = connectionManager;
2228
_dataSyncService = dataSyncService;
29+
_agentStore = agentStore;
2330
_dataChangeHandlers = messageHandlers.ToDictionary(x => x.Topic, x => x);
2431
_logger = logger;
2532
}
@@ -60,27 +67,86 @@ async Task NotifyConnectionAsync(Connection connection)
6067
}
6168
}
6269

63-
public async Task NotifyAsync(DataChangeMessage[] dataChanges)
70+
public async Task NotifyAsync(DataChangeMessage dataChange)
6471
{
65-
if (dataChanges.Length == 0)
72+
if (!_dataChangeHandlers.TryGetValue(dataChange.Topic, out var handler))
6673
{
74+
_logger.LogWarning("No data change handler found for topic {Topic}.", dataChange.Topic);
6775
return;
6876
}
6977

70-
foreach (var dataChange in dataChanges)
78+
try
7179
{
72-
if (!_dataChangeHandlers.TryGetValue(dataChange.Topic, out var handler))
73-
{
74-
_logger.LogWarning("No data change handler found for topic {Topic}.", dataChange.Topic);
75-
continue;
76-
}
77-
7880
await handler.HandleAsync(dataChange.Message, CancellationToken.None);
7981

8082
_logger.LogInformation(
81-
"Handled data change message for topic {Topic} (Item Id: {Id})",
83+
"Notified data change for topic {Topic} (Item Id: {Id})",
8284
dataChange.Topic, dataChange.Id
8385
);
8486
}
87+
catch (Exception ex)
88+
{
89+
_logger.LogError(
90+
ex,
91+
"Exception occurred while notifying data change for topic {Topic} (Item Id: {Id})",
92+
dataChange.Topic,
93+
dataChange.Id
94+
);
95+
}
96+
}
97+
98+
public async Task NotifyAsync(StoreItem item)
99+
{
100+
var dataChange = await ItemToDataChangeAsync();
101+
if (dataChange != null)
102+
{
103+
await NotifyAsync(dataChange);
104+
}
105+
106+
return;
107+
108+
async ValueTask<DataChangeMessage?> ItemToDataChangeAsync()
109+
{
110+
if (item.Type == StoreItemType.Flag)
111+
{
112+
var flagChange = new DataChangeMessage(
113+
Topics.FeatureFlagChange,
114+
item.Id,
115+
Encoding.UTF8.GetString(item.JsonBytes)
116+
);
117+
118+
return flagChange;
119+
}
120+
121+
if (item.Type == StoreItemType.Segment)
122+
{
123+
using var segment = JsonDocument.Parse(item.JsonBytes);
124+
125+
var envId = segment.RootElement.GetProperty("envId").GetGuid();
126+
var affectedIds = await _agentStore.GetFlagReferencesAsync(envId, item.Id);
127+
128+
JsonObject payload = new()
129+
{
130+
["segment"] = JsonSerializer.SerializeToNode(segment),
131+
["affectedFlagIds"] = JsonSerializer.SerializeToNode(affectedIds)
132+
};
133+
134+
var segmentChange = new DataChangeMessage(
135+
Topics.SegmentChange,
136+
item.Id,
137+
JsonSerializer.Serialize(payload)
138+
);
139+
140+
return segmentChange;
141+
}
142+
143+
_logger.LogWarning(
144+
"Unsupported StoreItem type {Type} for item Id {Id}, skipping data change notification for it.",
145+
item.Type,
146+
item.Id
147+
);
148+
149+
return null;
150+
}
85151
}
86152
}
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
using Api.Store;
2+
13
namespace Api.Messaging;
24

35
public interface IDataChangeNotifier
46
{
57
Task NotifyAsync(Guid envId);
68

7-
Task NotifyAsync(DataChangeMessage[] dataChanges);
9+
Task NotifyAsync(DataChangeMessage dataChange);
10+
11+
Task NotifyAsync(StoreItem item);
812
}

0 commit comments

Comments
 (0)