From 81d6a81d9a02aec0768ea3d7a5729b3a5241b495 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Sun, 1 Jan 2023 09:41:52 -0500 Subject: [PATCH] #1327 QueryBatcher now stops when query fails Addresses DEVEXP-147 (internal bug). Also enabled a test that had been disabled due to this bug. --- .../QueryBatcherJobReportTest.java | 7 --- .../datamovement/impl/QueryBatcherImpl.java | 28 ++++++----- .../QueryBatcherInitialQueryFailsTest.java | 48 +++++++++++++++++++ 3 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java diff --git a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/QueryBatcherJobReportTest.java b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/QueryBatcherJobReportTest.java index 0effc8399..cd9c9cd99 100644 --- a/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/QueryBatcherJobReportTest.java +++ b/marklogic-client-api-functionaltests/src/test/java/com/marklogic/client/datamovement/functionaltests/QueryBatcherJobReportTest.java @@ -32,7 +32,6 @@ import com.marklogic.client.query.StructuredQueryBuilder; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,12 +223,6 @@ public void testNullQdef() throws IOException, InterruptedException { } @Test - @Disabled("Ignoring this test for now, as it's failing intermittently due to the bug captured at " + - "https://github.com/marklogic/java-client-api/issues/1327; did some cleanup on this before ignoring it, as " + - "when it passed, the onQueryFailure handler was never being invoked. So that stuff was removed. It appears " + - "that the intent of the test is to verify that the retry mechanism for QueryBatcher kicks in when a failure " + - "occurs due to the database being temporarily disabled. But due to the 1327 bug, the test can hang " + - "indefinitely when the queryMgr.uris call fails.") public void queryFailures() throws Exception { // Insert documents to query String jsonDoc = "{" + "\"employees\": [" + "{ \"firstName\":\"John\" , \"lastName\":\"Doe\" }," diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java index d54e76333..e2c0ed12e 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/QueryBatcherImpl.java @@ -228,9 +228,9 @@ private void retry(QueryEvent queryEvent, boolean callFailListeners) { runnable.run(); } /* - * Accepts a QueryBatch which was successfully retrieved from the server and a - * QueryBatchListener which was failed to apply and retry that listener on the batch. - * + * Accepts a QueryBatch which was successfully retrieved from the server and a + * QueryBatchListener which was failed to apply and retry that listener on the batch. + * */ @Override public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener) { @@ -675,23 +675,22 @@ private class QueryTask implements Runnable { private boolean callFailListeners; private String afterUri; private String nextAfterUri; - boolean isQueryBatch; private QueryBatchImpl batch; private int totalProcessedCount = 0; - private boolean isLastBatch; - private int lastBatchNum; QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch ) { this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, null, -1, true); } + QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri ) { this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, afterUri, -1, true); } + QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest, String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri, long retryBatchNumber, boolean callFailListeners @@ -704,7 +703,6 @@ private class QueryTask implements Runnable { this.filtered = filtered; this.forestBatchNum = forestBatchNum; this.start = start; - this.isQueryBatch = isQueryBatch; this.retryBatchNumber = retryBatchNumber; this.callFailListeners = callFailListeners; this.batch = batch; @@ -745,12 +743,8 @@ public void run() { if (consistentSnapshot == true && serverTimestamp.get() > -1) { handle.setPointInTimeQueryTimestamp(serverTimestamp.get()); } - // this try-with-resources block will call results.close() once the block is done - // here we call the /v1/internal/uris endpoint to get the text/uri-list of documents - // matching this structured or string query + try (UrisHandle results = queryMgr.uris(queryMethod, query, filtered, handle, start, afterUri, forest.getForestName())) { - // if we're doing consistentSnapshot and this is the first result set, let's capture the - // serverTimestamp so we can use it for all future queries if (consistentSnapshot == true && serverTimestamp.get() == -1) { if (serverTimestamp.compareAndSet(-1, results.getServerTimestamp())) { logger.info("Consistent snapshot timestamp=[{}]", serverTimestamp); @@ -785,7 +779,15 @@ public void run() { isDone.set(true); shutdownIfAllForestsAreDone(); return; - } + } catch (Throwable t) { + // The above catch on a ResourceNotFoundException seems to be an expected error that doesn't need to be + // logged. But if the query fails for any other reason, such as an invalid index, the error should be + // logged and the job stopped. + logger.error("Query for URIs failed, stopping job; cause: " + t.getMessage(), t); + isDone.set(true); + shutdownIfAllForestsAreDone(); + return; + } batch = batch .withItems(uris.get(0).toArray(new String[uris.get(0).size()])) diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java new file mode 100644 index 000000000..a0307c3b9 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/QueryBatcherInitialQueryFailsTest.java @@ -0,0 +1,48 @@ +package com.marklogic.client.test.datamovement; + +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.QueryBatcher; +import com.marklogic.client.query.StructuredQueryBuilder; +import com.marklogic.client.query.StructuredQueryDefinition; +import com.marklogic.client.test.Common; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class QueryBatcherInitialQueryFailsTest { + + @Test + void jobStopsWhenQueryIsInvalid() { + Common.connect(); + + StructuredQueryBuilder queryBuilder = Common.client.newQueryManager().newStructuredQueryBuilder(); + StructuredQueryDefinition invalidQuery = queryBuilder.range( + queryBuilder.pathIndex("doesnt-work"), + "xs:date", StructuredQueryBuilder.Operator.GT, "2007-01-01" + ); + + AtomicBoolean successListenerInvoked = new AtomicBoolean(false); + AtomicBoolean failureListenerInvoked = new AtomicBoolean(false); + + DataMovementManager dataMovementManager = Common.client.newDataMovementManager(); + QueryBatcher queryBatcher = dataMovementManager.newQueryBatcher(invalidQuery) + .onUrisReady(batch -> successListenerInvoked.set(true)) + .onQueryFailure(failure -> failureListenerInvoked.set(true)); + + dataMovementManager.startJob(queryBatcher); + queryBatcher.awaitCompletion(); + dataMovementManager.stopJob(queryBatcher); + + assertFalse(successListenerInvoked.get(), + "The success listener should not have been invoked since the initial query was failed; additionally, " + + "getting to this point in the test verifies that the job stopped successfully, which prior to this " + + "test being written would not occur due to a bug"); + + assertFalse(failureListenerInvoked.get(), + "The failure listener should not have been invoked either; see QueryBatcherFailureTest for an explanation " + + "as to what a failure listener actually captures (it does not capture failures from an invalid query)"); + } + +}