Skip to content

[Flaky Tests] Fixing flaky tests related to derived source tests #18493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public void testDerivedSourceRollingRestart() throws Exception {
rollingRestartWithVerification(docCount);
}

public void testDerivedSourceWithMixedVersionRollingRestart() throws Exception {
public void testDerivedSourceWithMultiFieldsRollingRestart() throws Exception {
String mapping = """
{
"properties": {
Expand Down Expand Up @@ -503,12 +503,12 @@ public void testDerivedSourceWithMixedVersionRollingRestart() throws Exception {

// Add replicas before starting new nodes
assertAcked(
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 2))
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 1))
);

// Add nodes and additional documents
int totalDocs = docCount;
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 1; i++) {
internalCluster().startNode();

// Add more documents
Expand Down Expand Up @@ -564,7 +564,7 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
prepareCreate(
"test",
Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.put("index.derived_source.enabled", true)
.put("index.refresh_interval", "1s")
Expand Down Expand Up @@ -601,74 +601,86 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except

// Start concurrent updates during rolling restart
logger.info("--> starting rolling restart with concurrent updates");
concurrentUpdatesRollingRestartWithVerification(docCount);
}

private void concurrentUpdatesRollingRestartWithVerification(int initialDocCount) throws Exception {
AtomicBoolean stop = new AtomicBoolean(false);
AtomicInteger totalUpdates = new AtomicInteger(0);
CountDownLatch updateLatch = new CountDownLatch(1);

// Start concurrent update thread
Thread updateThread = new Thread(() -> {
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicInteger successfulUpdates = new AtomicInteger(0);
final CountDownLatch updateLatch = new CountDownLatch(1);
final Thread updateThread = new Thread(() -> {
try {
updateLatch.await(); // Wait for cluster to be ready
updateLatch.await();
while (stop.get() == false) {
try {
int docId = randomIntBetween(0, initialDocCount - 1);
client().prepareUpdate("test", String.valueOf(docId))
.setRetryOnConflict(3)
.setDoc("counter", randomIntBetween(0, 1000), "last_updated", System.currentTimeMillis(), "version", 1)
.execute()
.actionGet();
totalUpdates.incrementAndGet();
Thread.sleep(10);
// Update documents sequentially to avoid conflicts
for (int i = 0; i < docCount && !stop.get(); i++) {
client().prepareUpdate("test", String.valueOf(i))
.setRetryOnConflict(3)
.setDoc("counter", successfulUpdates.get() + 1, "last_updated", System.currentTimeMillis(), "version", 1)
.execute()
.actionGet(TimeValue.timeValueSeconds(5));
successfulUpdates.incrementAndGet();
Thread.sleep(50); // Larger delay between updates
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
break;
if (stop.get() == false) {
logger.warn("Error in update thread", e);
}
logger.warn("Error in update thread", e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
updateThread.start();

try {
// Add replicas
assertAcked(
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 2))
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 1))
);

// Start additional nodes
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 1; i++) {
internalCluster().startNode();
}
ensureGreen("test");

// Start updates
// Start updates after cluster is stable
updateThread.start();
updateLatch.countDown();

// Wait for some updates to occur
Thread.sleep(2000);
assertBusy(() -> { assertTrue("No successful updates occurred", successfulUpdates.get() > 0); }, 30, TimeUnit.SECONDS);

// Rolling restart of all nodes
for (String node : internalCluster().getNodeNames()) {
// Stop updates temporarily during node restart
stop.set(true);
Thread.sleep(1000); // Wait for in-flight operations to complete

internalCluster().restartNode(node);
ensureGreen(TimeValue.timeValueMinutes(2));
verifyDerivedSourceWithUpdates(initialDocCount);
ensureGreen(TimeValue.timeValueSeconds(60));

// Verify data consistency
refresh("test");
verifyDerivedSourceWithUpdates(docCount);

// Resume updates
stop.set(false);
}

} finally {
// Clean shutdown
stop.set(true);
updateThread.join();
updateThread.join(TimeValue.timeValueSeconds(30).millis());
if (updateThread.isAlive()) {
updateThread.interrupt();
updateThread.join(TimeValue.timeValueSeconds(5).millis());
}
}

logger.info("--> performed {} concurrent updates during rolling restart", totalUpdates.get());
logger.info("--> performed {} successful updates during rolling restart", successfulUpdates.get());
refresh("test");
flush("test");
verifyDerivedSourceWithUpdates(initialDocCount);
verifyDerivedSourceWithUpdates(docCount);
}

private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
Expand All @@ -687,7 +699,8 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
assertEquals("text value " + id, source.get("text_field"));
assertNotNull("counter missing for doc " + id, source.get("counter"));
assertFalse(((String) source.get("last_updated")).isEmpty());
assertEquals(1, source.get("version"));
Integer counter = (Integer) source.get("counter");
assertEquals(counter == 0 ? 0 : 1, source.get("version"));

// Verify text_field maintains original value
assertEquals("text value " + id, source.get("text_field"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
.endObject();
}
}) {
indexer.setUseAutoGeneratedIDs(true);
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer);
indexer.assertNoFailures();
Expand All @@ -769,7 +770,10 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
allowNodes("test", 2);

logger.info("--> waiting for GREEN health status ...");
ensureGreen(TimeValue.timeValueMinutes(2));
// make sure the cluster state is green, and all has been recovered
assertNoTimeout(
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("5m").setWaitForGreenStatus()
);

logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
waitForDocs(totalNumDocs, indexer);
Expand Down Expand Up @@ -858,6 +862,7 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
}
}) {

indexer.setUseAutoGeneratedIDs(true);
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(500, Math.min(1000, numDocs))) {
indexer.assertNoFailures();
logger.info("--> waiting for {} docs to be indexed ...", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -145,7 +146,9 @@ public void testGetCoreAndReaderCacheHelper() {

public void testWithRandomDocuments() throws IOException {
Directory randomDir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig(random(), null).setCodec(new RandomCodec(random()));
IndexWriterConfig config = newIndexWriterConfig(random(), null).setCodec(new RandomCodec(random()))
.setMergePolicy(NoMergePolicy.INSTANCE); // Prevent automatic merges

IndexWriter randomWriter = new IndexWriter(randomDir, config);

int numDocs = randomIntBetween(1, 10);
Expand All @@ -158,9 +161,14 @@ public void testWithRandomDocuments() throws IOException {
doc.add(new StoredField("_source", source));
randomWriter.addDocument(doc);
}

// Force merge into a single segment
randomWriter.forceMerge(1);
randomWriter.commit();

DirectoryReader randomDirectoryReader = DirectoryReader.open(randomWriter);
assertEquals("Should have exactly one segment", 1, randomDirectoryReader.leaves().size());

LeafReader randomLeafReader = randomDirectoryReader.leaves().get(0).reader();
DerivedSourceLeafReader randomDerivedReader = new DerivedSourceLeafReader(
randomLeafReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8233,14 +8233,16 @@ public void testGetMaxSeqNoFromSegmentInfosConcurrentWrites() throws IOException
}

public void testNewChangesSnapshotWithDerivedSource() throws IOException {

IOUtils.close(engine, store);
// Create test documents
List<Engine.Operation> operations = new ArrayList<>();
final int numDocs = randomIntBetween(1, 100);

try (Store store = createStore()) {
EngineConfig engineConfig = createEngineConfigWithMapperSupplierForDerivedSource(store);
try (InternalEngine engine = createEngine(engineConfig)) {
InternalEngine engine = null;
try {
engine = createEngine(engineConfig);
// Index documents
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(
Expand Down Expand Up @@ -8294,19 +8296,24 @@ public void testNewChangesSnapshotWithDerivedSource() throws IOException {
// Verify we got all documents
assertThat(count, equalTo(numDocs));
}
} finally {
IOUtils.close(engine, store);
}
}
}

public void testNewChangesSnapshotWithDeleteAndUpdate() throws IOException {
public void testNewChangesSnapshotWithDeleteAndUpdateWithDerivedSource() throws IOException {
IOUtils.close(engine, store);
final List<Engine.Operation> operations = new ArrayList<>();
int numDocs = randomIntBetween(10, 100);
int numDocsToDelete = randomIntBetween(1, numDocs / 2);
Set<String> deletedDocs = new HashSet<>();

try (Store store = createStore()) {
EngineConfig engineConfig = createEngineConfigWithMapperSupplierForDerivedSource(store);
try (InternalEngine engine = createEngine(engineConfig)) {
InternalEngine engine = null;
try {
engine = createEngine(engineConfig);
// First index documents
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(
Expand Down Expand Up @@ -8451,6 +8458,8 @@ public void testNewChangesSnapshotWithDeleteAndUpdate() throws IOException {
}
assertEquals("Expected number of operations in range", to - from + 1, count);
}
} finally {
IOUtils.close(engine, store);
}
}
}
Expand Down
Loading