-
Notifications
You must be signed in to change notification settings - Fork 60
Draft: Test that /messages
works on remote homeserver and can be backfilled properly after many batches (MSC2716)
#214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
8099f47
094c5f7
e30bcd4
1e333d6
2022b31
d325349
2fe5180
0604564
83adbe2
4aba836
ffbca43
37109fa
cc7236b
4c8284a
9b90429
677836b
85eb7bd
3532821
1667e15
0589546
606197a
d679384
230c46e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -876,6 +876,77 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
}, | ||||
}) | ||||
}) | ||||
|
||||
t.Run("Backfill still works after many batches are imported", func(t *testing.T) { | ||||
t.Parallel() | ||||
|
||||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
alice.JoinRoom(t, roomID, nil) | ||||
|
||||
// Create some normal messages in the timeline | ||||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") | ||||
eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] | ||||
timeAfterEventBefore := time.Now() | ||||
|
||||
// We chose the magic number 11 because Synapse currently limits the | ||||
// backfill extremities to 5. 10 also seemed like a round number someone | ||||
// could pick for other homeserver implementations so I just did 10+1 to | ||||
// make sure it also worked in that case. | ||||
//numBatches := 11 | ||||
numBatches := 2 | ||||
numHistoricalMessagesPerBatch := 100 | ||||
// wait X number of ms to ensure that the timestamp changes enough for | ||||
// each of the historical messages we try to import later | ||||
time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) | ||||
|
||||
// eventIDsAfter | ||||
createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") | ||||
|
||||
// Import a long chain of batches connected to each other. | ||||
// We want to make sure Synapse doesn't blow up after we import | ||||
// many messages. | ||||
var expectedEventIDs []string | ||||
var denyListEventIDs []string | ||||
var baseInsertionEventID string | ||||
nextBatchID := "" | ||||
for i := 0; i < numBatches; i++ { | ||||
insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) | ||||
batchSendRes := batchSendHistoricalMessages( | ||||
t, | ||||
as, | ||||
roomID, | ||||
eventIdBefore, | ||||
nextBatchID, | ||||
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime), | ||||
createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch), | ||||
// Status | ||||
200, | ||||
) | ||||
batchSendResBody := client.ParseJSON(t, batchSendRes) | ||||
// Make sure we see all of the historical messages | ||||
expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) | ||||
// We should not find any historical state between the batches of messages | ||||
denyListEventIDs = append(denyListEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")...) | ||||
nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") | ||||
|
||||
// Grab the base insertion event ID to reference later in the marker event | ||||
if i == 0 { | ||||
baseInsertionEventID = client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") | ||||
} | ||||
} | ||||
|
||||
// Make sure we see the events at the very start of the message history | ||||
expectedEventIDs = append(expectedEventIDs, eventIDsBefore...) | ||||
|
||||
// Join the room from a remote homeserver after the historical messages were sent | ||||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
||||
// Send the marker event | ||||
sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) | ||||
|
||||
// Make sure events can be backfilled from the remote homeserver | ||||
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs, denyListEventIDs) | ||||
}) | ||||
}) | ||||
|
||||
t.Run("Existing room versions", func(t *testing.T) { | ||||
|
@@ -1017,10 +1088,10 @@ func includes(needle string, haystack []string) bool { | |||
func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { | ||||
t.Helper() | ||||
start := time.Now() | ||||
checkCounter := 0 | ||||
callCounter := 0 | ||||
for { | ||||
if time.Since(start) > c.SyncUntilTimeout { | ||||
t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) | ||||
t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", callCounter) | ||||
} | ||||
|
||||
messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||||
|
@@ -1044,9 +1115,104 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, | |||
} | ||||
} | ||||
|
||||
checkCounter++ | ||||
// Add a slight delay so we don't hammmer the messages endpoint | ||||
time.Sleep(500 * time.Millisecond) | ||||
callCounter++ | ||||
} | ||||
} | ||||
|
||||
// Paginate the /messages endpoint until we find all of the expectedEventIds | ||||
// (order does not matter). If any event in denyListEventIDs is found, an error | ||||
// will be thrown. | ||||
func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string, denyListEventIDs []string) { | ||||
t.Helper() | ||||
start := time.Now() | ||||
|
||||
workingExpectedEventIDMap := make(map[string]string) | ||||
for _, expectedEventID := range expectedEventIDs { | ||||
workingExpectedEventIDMap[expectedEventID] = expectedEventID | ||||
} | ||||
|
||||
denyEventIDMap := make(map[string]string) | ||||
for _, denyEventID := range denyListEventIDs { | ||||
denyEventIDMap[denyEventID] = denyEventID | ||||
} | ||||
|
||||
var actualEventIDList []string | ||||
callCounter := 0 | ||||
messageResEnd := "" | ||||
generateErrorMesssageInfo := func() string { | ||||
i := 0 | ||||
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap)) | ||||
for eventID := range workingExpectedEventIDMap { | ||||
leftoverEventIDs[i] = eventID | ||||
i++ | ||||
} | ||||
|
||||
return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", | ||||
callCounter, | ||||
len(expectedEventIDs)-len(leftoverEventIDs), | ||||
len(expectedEventIDs), | ||||
len(leftoverEventIDs), | ||||
leftoverEventIDs, | ||||
len(actualEventIDList), | ||||
actualEventIDList, | ||||
) | ||||
} | ||||
|
||||
for { | ||||
if time.Since(start) > 200*c.SyncUntilTimeout { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout is currently very large to accommodate the long ~20s Synapse really has to chug for those requests 👹 and ideally wouldn't have to modify this at all. I would need to look into optimizing Synapse to make this fast which we should probably do anyway as this is painfully slow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is currently set to a whopping 1000s by default complement/internal/docker/deployment.go Line 57 in 136fd60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kegsay Sorry this wasn't clear as undrafting indicates "marked this pull request as ready for review" but I didn't assign you this one yet specifically because of this problem. The test itself is good to go (timeout can be switched to normal and I used Thanks for the review pass though and I'll fix up these other spots ⏩ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Status is still the same as the last update in this thread. It's too slow on Synapse for me to be comfortable merging it yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Now matrix-org/synapse#12988 I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made progress on optimizing Synapse:
|
||||
t.Fatalf( | ||||
"paginateUntilMessageCheckOff timed out. %s", | ||||
generateErrorMesssageInfo(), | ||||
) | ||||
} | ||||
|
||||
messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||||
"dir": []string{"b"}, | ||||
"limit": []string{"100"}, | ||||
"from": []string{messageResEnd}, | ||||
})) | ||||
callCounter++ | ||||
messsageResBody := client.ParseJSON(t, messagesRes) | ||||
messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") | ||||
// Since the original body can only be read once, create a new one from the body bytes we just read | ||||
messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) | ||||
|
||||
foundEventInMessageResponse := false | ||||
must.MatchResponse(t, messagesRes, match.HTTPResponse{ | ||||
JSON: []match.JSON{ | ||||
match.JSONArrayEach("chunk", func(ev gjson.Result) error { | ||||
foundEventInMessageResponse = true | ||||
eventID := ev.Get("event_id").Str | ||||
actualEventIDList = append(actualEventIDList, eventID) | ||||
|
||||
if _, keyExists := denyEventIDMap[eventID]; keyExists { | ||||
return fmt.Errorf( | ||||
"paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s", | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👆 Calling out a change since the last review pass. I added a |
||||
eventID, | ||||
generateErrorMesssageInfo(), | ||||
) | ||||
} | ||||
|
||||
if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { | ||||
delete(workingExpectedEventIDMap, eventID) | ||||
} | ||||
|
||||
return nil | ||||
}), | ||||
}, | ||||
}) | ||||
|
||||
if !foundEventInMessageResponse { | ||||
t.Fatalf( | ||||
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s", | ||||
generateErrorMesssageInfo(), | ||||
) | ||||
} | ||||
|
||||
// We were able to find all of the expected events! | ||||
if len(workingExpectedEventIDMap) == 0 { | ||||
return | ||||
} | ||||
} | ||||
} | ||||
|
||||
|
Uh oh!
There was an error while loading. Please reload this page.