Skip to content

#1327 QueryBatcher now stops when query fails #1505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.marklogic.client.query.StructuredQueryBuilder;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -224,12 +223,6 @@ public void testNullQdef() throws IOException, InterruptedException {
}

@Test
@Disabled("Ignoring this test for now, as it's failing intermittently due to the bug captured at " +
"https://github.com/marklogic/java-client-api/issues/1327; did some cleanup on this before ignoring it, as " +
"when it passed, the onQueryFailure handler was never being invoked. So that stuff was removed. It appears " +
"that the intent of the test is to verify that the retry mechanism for QueryBatcher kicks in when a failure " +
"occurs due to the database being temporarily disabled. But due to the 1327 bug, the test can hang " +
"indefinitely when the queryMgr.uris call fails.")
public void queryFailures() throws Exception {
// Insert documents to query
String jsonDoc = "{" + "\"employees\": [" + "{ \"firstName\":\"John\" , \"lastName\":\"Doe\" },"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ private void retry(QueryEvent queryEvent, boolean callFailListeners) {
runnable.run();
}
/*
* Accepts a QueryBatch which was successfully retrieved from the server and a
* QueryBatchListener which was failed to apply and retry that listener on the batch.
*
* Accepts a QueryBatch which was successfully retrieved from the server and a
* QueryBatchListener which was failed to apply and retry that listener on the batch.
*
*/
@Override
public void retryListener(QueryBatch batch, QueryBatchListener queryBatchListener) {
Expand Down Expand Up @@ -675,23 +675,22 @@ private class QueryTask implements Runnable {
private boolean callFailListeners;
private String afterUri;
private String nextAfterUri;
boolean isQueryBatch;
private QueryBatchImpl batch;
private int totalProcessedCount = 0;
private boolean isLastBatch;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed isLastBatch and lastBatchNum as neither were used anywhere.

private int lastBatchNum;

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch
) {
this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, null, -1, true);
}

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start, QueryBatchImpl batch, String afterUri
) {
this(moveMgr, batcher, forest, queryMethod, query, filtered, forestBatchNum, start, batch, afterUri,
-1, true);
}

QueryTask(DataMovementManager moveMgr, QueryBatcherImpl batcher, Forest forest,
String queryMethod, SearchQueryDefinition query, Boolean filtered, long forestBatchNum, long start,
QueryBatchImpl batch, String afterUri, long retryBatchNumber, boolean callFailListeners
Expand All @@ -704,7 +703,6 @@ private class QueryTask implements Runnable {
this.filtered = filtered;
this.forestBatchNum = forestBatchNum;
this.start = start;
this.isQueryBatch = isQueryBatch;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, isQueryBatch is not used anywhere and this was assigning the variable to itself.

this.retryBatchNumber = retryBatchNumber;
this.callFailListeners = callFailListeners;
this.batch = batch;
Expand Down Expand Up @@ -745,12 +743,8 @@ public void run() {
if (consistentSnapshot == true && serverTimestamp.get() > -1) {
handle.setPointInTimeQueryTimestamp(serverTimestamp.get());
}
// this try-with-resources block will call results.close() once the block is done
// here we call the /v1/internal/uris endpoint to get the text/uri-list of documents
// matching this structured or string query

try (UrisHandle results = queryMgr.uris(queryMethod, query, filtered, handle, start, afterUri, forest.getForestName())) {
// if we're doing consistentSnapshot and this is the first result set, let's capture the
// serverTimestamp so we can use it for all future queries
if (consistentSnapshot == true && serverTimestamp.get() == -1) {
if (serverTimestamp.compareAndSet(-1, results.getServerTimestamp())) {
logger.info("Consistent snapshot timestamp=[{}]", serverTimestamp);
Expand Down Expand Up @@ -785,7 +779,15 @@ public void run() {
isDone.set(true);
shutdownIfAllForestsAreDone();
return;
}
} catch (Throwable t) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@llinggit I believe this catch used to exist prior to this change for #1269, which resulted in this catch being removed. The test I added captures the issue, which is that without this catch, a failed query results in QueryBatcherImpl hanging indefinitely. I am not 100% sure of the contents of this block though, but it does result in the desired fix, which is that the job is stopped and control returns back to the user.

// The above catch on a ResourceNotFoundException seems to be an expected error that doesn't need to be
// logged. But if the query fails for any other reason, such as an invalid index, the error should be
// logged and the job stopped.
logger.error("Query for URIs failed, stopping job; cause: " + t.getMessage(), t);
isDone.set(true);
shutdownIfAllForestsAreDone();
return;
}

batch = batch
.withItems(uris.get(0).toArray(new String[uris.get(0).size()]))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.marklogic.client.test.datamovement;

import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.query.StructuredQueryBuilder;
import com.marklogic.client.query.StructuredQueryDefinition;
import com.marklogic.client.test.Common;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertFalse;

public class QueryBatcherInitialQueryFailsTest {

@Test
void jobStopsWhenQueryIsInvalid() {
Common.connect();

StructuredQueryBuilder queryBuilder = Common.client.newQueryManager().newStructuredQueryBuilder();
StructuredQueryDefinition invalidQuery = queryBuilder.range(
queryBuilder.pathIndex("doesnt-work"),
"xs:date", StructuredQueryBuilder.Operator.GT, "2007-01-01"
);

AtomicBoolean successListenerInvoked = new AtomicBoolean(false);
AtomicBoolean failureListenerInvoked = new AtomicBoolean(false);

DataMovementManager dataMovementManager = Common.client.newDataMovementManager();
QueryBatcher queryBatcher = dataMovementManager.newQueryBatcher(invalidQuery)
.onUrisReady(batch -> successListenerInvoked.set(true))
.onQueryFailure(failure -> failureListenerInvoked.set(true));

dataMovementManager.startJob(queryBatcher);
queryBatcher.awaitCompletion();
dataMovementManager.stopJob(queryBatcher);

assertFalse(successListenerInvoked.get(),
"The success listener should not have been invoked since the initial query was failed; additionally, " +
"getting to this point in the test verifies that the job stopped successfully, which prior to this " +
"test being written would not occur due to a bug");

assertFalse(failureListenerInvoked.get(),
"The failure listener should not have been invoked either; see QueryBatcherFailureTest for an explanation " +
"as to what a failure listener actually captures (it does not capture failures from an invalid query)");
}

}