Skip to content

Commit 3f96e33

Browse files
committed
recent_senders: Add the new MessageIdTracker and RecentSenders data structures
MessageIdTracker data structure is used to keep track of message ids in an ascending sorted list. It is used in RecentSenders data structure. RecentSenders data structure is used to keep track of user messages in topics and streams. Much of this code is transcribed from Zulip web; in particular, from: https://github.com/zulip/zulip/blob/bd04a30bbc6dc5bd7c20940a3d1d34cf8c8c6f28/web/src/recent_senders.ts
1 parent 4ea252c commit 3f96e33

File tree

5 files changed

+537
-0
lines changed

5 files changed

+537
-0
lines changed

lib/model/message_list.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
403403
numAfter: 0,
404404
);
405405
store.reconcileMessages(result.messages);
406+
store.recentSenders.handleMessages(result.messages); // TODO(#824)
406407
for (final message in result.messages) {
407408
if (_messageVisible(message)) {
408409
_addMessage(message);
@@ -439,6 +440,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
439440
}
440441

441442
store.reconcileMessages(result.messages);
443+
store.recentSenders.handleMessages(result.messages); // TODO(#824)
442444

443445
final fetchedMessages = _allMessagesVisible
444446
? result.messages // Avoid unnecessarily copying the list.

lib/model/recent_senders.dart

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import 'package:collection/collection.dart';
2+
import 'package:flutter/foundation.dart';
3+
4+
import '../api/model/events.dart';
5+
import '../api/model/model.dart';
6+
import 'algorithms.dart';
7+
8+
/// A data structure to keep track of stream and topic messages of users (senders).
9+
///
10+
/// Use [latestMessageIdOfSenderInStream] and [latestMessageIdOfSenderInTopic]
11+
/// to get the relevant data.
12+
class RecentSenders {
13+
// streamSenders[streamId][senderId] = MessageIdTracker
14+
@visibleForTesting
15+
final Map<int, Map<int, _MessageIdTracker>> streamSenders = {};
16+
17+
// topicSenders[streamId][topic][senderId] = MessageIdTracker
18+
@visibleForTesting
19+
final Map<int, Map<String, Map<int, _MessageIdTracker>>> topicSenders = {};
20+
21+
int? latestMessageIdOfSenderInStream({
22+
required int streamId,
23+
required int senderId,
24+
}) => streamSenders[streamId]?[senderId]?.maxId;
25+
26+
int? latestMessageIdOfSenderInTopic({
27+
required int streamId,
28+
required String topic,
29+
required int senderId,
30+
}) => topicSenders[streamId]?[topic]?[senderId]?.maxId;
31+
32+
/// Records the necessary data from each message if it is a [StreamMessage].
33+
///
34+
/// [messages] should be sorted by [id] ascendingly, which are, the way app
35+
/// receives and handles messages.
36+
void handleMessages(List<Message> messages) {
37+
final messagesByUserInStream = <(int, int), List<int>>{};
38+
final messagesByUserInTopic = <(int, String, int), List<int>>{};
39+
for (final message in messages) {
40+
if (message is! StreamMessage) continue;
41+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
42+
(messagesByUserInStream[(streamId, senderId)] ??= []).add(messageId);
43+
(messagesByUserInTopic[(streamId, topic, senderId)] ??= []).add(messageId);
44+
}
45+
46+
for (final entry in messagesByUserInStream.entries) {
47+
final (streamId, senderId) = entry.key;
48+
((streamSenders[streamId] ??= {})
49+
[senderId] ??= _MessageIdTracker()).addAll(entry.value);
50+
}
51+
for (final entry in messagesByUserInTopic.entries) {
52+
final (streamId, topic, senderId) = entry.key;
53+
(((topicSenders[streamId] ??= {})[topic] ??= {})
54+
[senderId] ??= _MessageIdTracker()).addAll(entry.value);
55+
}
56+
}
57+
58+
/// Records the necessary data from [message] if it is a [StreamMessage].
59+
///
60+
/// If [message] is not a [StreamMessage], this is a no-op.
61+
void handleMessage(Message message) {
62+
if (message is! StreamMessage) return;
63+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
64+
((streamSenders[streamId] ??= {})
65+
[senderId] ??= _MessageIdTracker()).add(messageId);
66+
(((topicSenders[streamId] ??= {})[topic] ??= {})
67+
[senderId] ??= _MessageIdTracker()).add(messageId);
68+
}
69+
70+
void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
71+
if (event.messageType != MessageType.stream) return;
72+
final messagesByUserInStream = <(int, int), List<int>>{};
73+
final messagesByUserInTopic = <(int, String, int), List<int>>{};
74+
75+
final DeleteMessageEvent(:streamId!, :topic!) = event;
76+
for (final id in event.messageIds) {
77+
final message = cachedMessages[id] as StreamMessage?;
78+
if (message == null) continue;
79+
final senderId = message.senderId;
80+
(messagesByUserInStream[(streamId, senderId)] ??= []).add(id);
81+
(messagesByUserInTopic[(streamId, topic, senderId)] ??= []).add(id);
82+
}
83+
84+
for (final entry in messagesByUserInStream.entries) {
85+
final MapEntry(key: (streamId, senderId), value: messages) = entry;
86+
final sendersMap = streamSenders[streamId];
87+
final idTracker = sendersMap?[senderId];
88+
89+
idTracker?.removeAll(messages);
90+
if (idTracker?.maxId == null) sendersMap?.remove(senderId);
91+
if (sendersMap?.isEmpty ?? false) streamSenders.remove(streamId);
92+
}
93+
94+
for (final entry in messagesByUserInTopic.entries) {
95+
final MapEntry(key: (streamId, topic, senderId), value: messages) = entry;
96+
final topicsMap = topicSenders[streamId];
97+
final sendersMap = topicsMap?[topic];
98+
final idTracker = sendersMap?[senderId];
99+
100+
idTracker?.removeAll(messages);
101+
if (idTracker?.maxId == null) sendersMap?.remove(senderId);
102+
if (sendersMap?.isEmpty ?? false) topicsMap?.remove(topic);
103+
if (topicsMap?.isEmpty ?? false) topicSenders.remove(streamId);
104+
}
105+
}
106+
}
107+
108+
class _MessageIdTracker {
109+
/// A list of distinct message IDs, sorted ascendingly.
110+
@visibleForTesting
111+
QueueList<int> ids = QueueList.from([]);
112+
113+
/// The maximum id in the tracker list, or `null` if the list is empty.
114+
int? get maxId => ids.lastOrNull;
115+
116+
/// Add the message ID to the tracker list at the proper place, if not present.
117+
///
118+
/// Optimized, taking O(log n) time for the case where that place is the end,
119+
/// because that's the common case for a message that is received through
120+
/// [PerAccountStore.handleEvent]. May take O(n) time in some rare cases.
121+
void add(int id) {
122+
final i = lowerBound(ids, id);
123+
if (i < ids.length && ids[i] == id) {
124+
// The ID is already present. Nothing to do.
125+
return;
126+
}
127+
if (i == ids.length) {
128+
ids.addLast(id);
129+
} else {
130+
ids.insert(i, id);
131+
}
132+
}
133+
134+
/// Add the messages IDs to the tracker list at the proper place, if not present.
135+
///
136+
/// [newIds] should be sorted ascendingly.
137+
void addAll(List<int> newIds) {
138+
if (ids.isEmpty) {
139+
ids = QueueList.from(newIds);
140+
return;
141+
}
142+
ids = setUnion(ids, newIds);
143+
}
144+
145+
void removeAll(List<int> idsToRemove) {
146+
ids.removeWhere((id) {
147+
final i = lowerBound(idsToRemove, id);
148+
return i < idsToRemove.length && idsToRemove[i] == id;
149+
});
150+
}
151+
152+
@override
153+
String toString() => ids.toString();
154+
}

lib/model/store.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import 'database.dart';
2323
import 'message.dart';
2424
import 'message_list.dart';
2525
import 'recent_dm_conversations.dart';
26+
import 'recent_senders.dart';
2627
import 'stream.dart';
2728
import 'typing_status.dart';
2829
import 'unreads.dart';
@@ -256,6 +257,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
256257
),
257258
recentDmConversationsView: RecentDmConversationsView(
258259
initial: initialSnapshot.recentPrivateConversations, selfUserId: account.userId),
260+
recentSenders: RecentSenders(),
259261
);
260262
}
261263

@@ -276,6 +278,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
276278
required MessageStoreImpl messages,
277279
required this.unreads,
278280
required this.recentDmConversationsView,
281+
required this.recentSenders,
279282
}) : assert(selfUserId == globalStore.getAccount(accountId)!.userId),
280283
assert(realmUrl == globalStore.getAccount(accountId)!.realmUrl),
281284
assert(realmUrl == connection.realmUrl),
@@ -369,6 +372,8 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
369372

370373
final RecentDmConversationsView recentDmConversationsView;
371374

375+
final RecentSenders recentSenders;
376+
372377
////////////////////////////////
373378
// Other digests of data.
374379

@@ -480,6 +485,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
480485
_messages.handleMessageEvent(event);
481486
unreads.handleMessageEvent(event);
482487
recentDmConversationsView.handleMessageEvent(event);
488+
recentSenders.handleMessage(event.message); // TODO(#824)
483489
// When adding anything here (to handle [MessageEvent]),
484490
// it probably belongs in [reconcileMessages] too.
485491
} else if (event is UpdateMessageEvent) {
@@ -488,6 +494,11 @@ class PerAccountStore extends ChangeNotifier with StreamStore, MessageStore {
488494
unreads.handleUpdateMessageEvent(event);
489495
} else if (event is DeleteMessageEvent) {
490496
assert(debugLog("server event: delete_message ${event.messageIds}"));
497+
// This should be called before [_messages.handleDeleteMessageEvent(event)],
498+
// as we need to know about each message for [event.messageIds],
499+
// specifically, their `senderId`s. By calling this after the
500+
// aforementioned line, we'll lose reference to those messages.
501+
recentSenders.handleDeleteMessageEvent(event, messages);
491502
_messages.handleDeleteMessageEvent(event);
492503
unreads.handleDeleteMessageEvent(event);
493504
} else if (event is UpdateMessageFlagsEvent) {

test/model/message_list_test.dart

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import '../api/model/model_checks.dart';
1818
import '../example_data.dart' as eg;
1919
import '../stdlib_checks.dart';
2020
import 'content_checks.dart';
21+
import 'recent_senders_test.dart' as recent_senders_test;
2122
import 'test_store.dart';
2223

2324
void main() {
@@ -141,6 +142,25 @@ void main() {
141142
..haveOldest.isTrue();
142143
});
143144

145+
// TODO(#824): move this test
146+
test('fetchInitial, recent senders track all the messages', () async {
147+
const narrow = CombinedFeedNarrow();
148+
await prepare(narrow: narrow);
149+
final messages = [
150+
eg.streamMessage(),
151+
// Not subscribed to the stream with id 10.
152+
eg.streamMessage(stream: eg.stream(streamId: 10)),
153+
];
154+
connection.prepare(json: newestResult(
155+
foundOldest: false,
156+
messages: messages,
157+
).toJson());
158+
await model.fetchInitial();
159+
160+
check(model).messages.length.equals(1);
161+
recent_senders_test.checkMatchesMessages(store.recentSenders, messages);
162+
});
163+
144164
test('fetchOlder', () async {
145165
const narrow = CombinedFeedNarrow();
146166
await prepare(narrow: narrow);
@@ -233,6 +253,27 @@ void main() {
233253
..messages.length.equals(200);
234254
});
235255

256+
// TODO(#824): move this test
257+
test('fetchOlder, recent senders track all the messages', () async {
258+
const narrow = CombinedFeedNarrow();
259+
await prepare(narrow: narrow);
260+
final initialMessages = List.generate(10, (i) => eg.streamMessage(id: 100 + i));
261+
await prepareMessages(foundOldest: false, messages: initialMessages);
262+
263+
final oldMessages = List.generate(10, (i) => eg.streamMessage(id: 89 + i))
264+
// Not subscribed to the stream with id 10.
265+
..add(eg.streamMessage(id: 99, stream: eg.stream(streamId: 10)));
266+
connection.prepare(json: olderResult(
267+
anchor: 100, foundOldest: false,
268+
messages: oldMessages,
269+
).toJson());
270+
await model.fetchOlder();
271+
272+
check(model).messages.length.equals(20);
273+
recent_senders_test.checkMatchesMessages(store.recentSenders,
274+
[...initialMessages, ...oldMessages]);
275+
});
276+
236277
test('MessageEvent', () async {
237278
final stream = eg.stream();
238279
await prepare(narrow: StreamNarrow(stream.streamId));

0 commit comments

Comments
 (0)