|
78 | 78 | import org.opensearch.test.OpenSearchTestCase;
|
79 | 79 | import org.opensearch.threadpool.ThreadPool;
|
80 | 80 | import org.opensearch.threadpool.ThreadPool.Names;
|
| 81 | +import org.hamcrest.MatcherAssert; |
81 | 82 | import org.junit.Before;
|
82 | 83 |
|
83 | 84 | import java.nio.charset.StandardCharsets;
|
|
104 | 105 |
|
105 | 106 | import static java.util.Collections.emptyMap;
|
106 | 107 | import static java.util.Collections.emptySet;
|
| 108 | +import static org.hamcrest.Matchers.contains; |
107 | 109 | import static org.hamcrest.Matchers.equalTo;
|
108 | 110 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
| 111 | +import static org.hamcrest.Matchers.hasSize; |
109 | 112 | import static org.hamcrest.Matchers.instanceOf;
|
110 | 113 | import static org.hamcrest.Matchers.is;
|
111 | 114 | import static org.hamcrest.Matchers.notNullValue;
|
112 | 115 | import static org.hamcrest.Matchers.nullValue;
|
113 | 116 | import static org.hamcrest.Matchers.sameInstance;
|
114 | 117 | import static org.mockito.Mockito.any;
|
115 |
| -import static org.mockito.Mockito.anyInt; |
116 | 118 | import static org.mockito.Mockito.anyString;
|
117 | 119 | import static org.mockito.Mockito.argThat;
|
118 | 120 | import static org.mockito.Mockito.doAnswer;
|
@@ -1106,27 +1108,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception {
|
1106 | 1108 | verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
1107 | 1109 | }
|
1108 | 1110 |
|
1109 |
| - public void testBulkRequestExecutionWithFailures() throws Exception { |
| 1111 | + public void testBulkRequestExecutionWithFailures() { |
1110 | 1112 | BulkRequest bulkRequest = new BulkRequest();
|
1111 | 1113 | String pipelineId = "_id";
|
1112 | 1114 |
|
1113 |
| - int numRequest = scaledRandomIntBetween(8, 64); |
1114 |
| - int numIndexRequests = 0; |
1115 |
| - for (int i = 0; i < numRequest; i++) { |
1116 |
| - DocWriteRequest request; |
| 1115 | + int numIndexRequests = scaledRandomIntBetween(4, 32); |
| 1116 | + for (int i = 0; i < numIndexRequests; i++) { |
| 1117 | + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); |
| 1118 | + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); |
| 1119 | + bulkRequest.add(indexRequest); |
| 1120 | + } |
| 1121 | + int numOtherRequests = scaledRandomIntBetween(4, 32); |
| 1122 | + for (int i = 0; i < numOtherRequests; i++) { |
1117 | 1123 | if (randomBoolean()) {
|
1118 |
| - if (randomBoolean()) { |
1119 |
| - request = new DeleteRequest("_index", "_id"); |
1120 |
| - } else { |
1121 |
| - request = new UpdateRequest("_index", "_id"); |
1122 |
| - } |
| 1124 | + bulkRequest.add(new DeleteRequest("_index", "_id")); |
1123 | 1125 | } else {
|
1124 |
| - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); |
1125 |
| - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); |
1126 |
| - request = indexRequest; |
1127 |
| - numIndexRequests++; |
| 1126 | + bulkRequest.add(new UpdateRequest("_index", "_id")); |
1128 | 1127 | }
|
1129 |
| - bulkRequest.add(request); |
1130 | 1128 | }
|
1131 | 1129 |
|
1132 | 1130 | CompoundProcessor processor = mock(CompoundProcessor.class);
|
@@ -1155,23 +1153,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
|
1155 | 1153 | clusterState = IngestService.innerPut(putRequest, clusterState);
|
1156 | 1154 | ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
1157 | 1155 |
|
1158 |
| - @SuppressWarnings("unchecked") |
1159 |
| - BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class); |
1160 |
| - @SuppressWarnings("unchecked") |
1161 |
| - final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class); |
| 1156 | + final Map<Integer, Exception> errorHandler = new HashMap<>(); |
| 1157 | + final Map<Thread, Exception> completionHandler = new HashMap<>(); |
1162 | 1158 | ingestService.executeBulkRequest(
|
1163 |
| - numRequest, |
| 1159 | + numIndexRequests + numOtherRequests, |
1164 | 1160 | bulkRequest.requests(),
|
1165 |
| - requestItemErrorHandler, |
1166 |
| - completionHandler, |
| 1161 | + errorHandler::put, |
| 1162 | + completionHandler::put, |
1167 | 1163 | indexReq -> {},
|
1168 | 1164 | Names.WRITE,
|
1169 | 1165 | bulkRequest
|
1170 | 1166 | );
|
1171 | 1167 |
|
1172 |
| - verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error))); |
| 1168 | + MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests)); |
| 1169 | + errorHandler.values().forEach(e -> assertEquals(e.getCause(), error)); |
1173 | 1170 |
|
1174 |
| - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); |
| 1171 | + MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread())); |
1175 | 1172 | }
|
1176 | 1173 |
|
1177 | 1174 | public void testBulkRequestExecution() throws Exception {
|
|
0 commit comments