Skip to content

Adding integration of derived source feature across diff paths #18054

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
- Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Derive Source] Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Derive Source] Adding integration of derived source feature across diff paths ([#18054](https://github.com/opensearch-project/OpenSearch/pull/18054))
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@

package org.opensearch.index.reindex;

import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.search.SearchHit;
import org.opensearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,7 +48,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -177,4 +186,301 @@ public void testMissingSources() {
assertThat(response, matcher().created(0).slices(hasSize(0)));
}

public void testReindexWithDerivedSource() throws Exception {
// Create source index with derived source setting enabled
String sourceIndexMapping = """
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"derived_source": {
"enabled": true
}
}
},
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "keyword",
"store": true
},
"bar": {
"type": "integer",
"store": true
}
}
}
}
}""";

// Create indices
assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON));
assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON));
ensureGreen();

// Index some documents
int numDocs = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i));
}
indexRandom(true, docs);

// Test 1: Basic reindex
ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true);

BulkByScrollResponse response = copy.get();
assertThat(response, matcher().created(numDocs));
long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs, expectedCount);

// Test 2: Reindex with query filter
String destIndexFiltered = "dest_index_filtered";
assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true);

response = copy.get();
expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value();
assertThat(response, matcher().created(expectedCount));

// Test 3: Reindex with slices
String destIndexSliced = "dest_index_sliced";
assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON));

int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));

// Test 4: Reindex with maxDocs
String destIndexMaxDocs = "dest_index_maxdocs";
assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON));

int maxDocs = numDocs / 2;
copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true);

response = copy.get();
assertThat(response, matcher().created(maxDocs));
expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(maxDocs, expectedCount);

// Test 5: Multiple source indices
String sourceIndex2 = "source_index_2";
assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON));

int numDocs2 = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs2 = new ArrayList<>();
for (int i = 0; i < numDocs2; i++) {
docs2.add(
client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs)
);
}
indexRandom(true, docs2);

String destIndexMulti = "dest_index_multi";
assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs + numDocs2));
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs + numDocs2, expectedCount);
}

public void testReindexFromDerivedSourceToNormalIndex() throws Exception {
// Create source index with derived source enabled
String sourceMapping = """
{
"properties": {
"text_field": {
"type": "text",
"store": true
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long",
"doc_values": true
},
"date_field": {
"type": "date",
"store": true
}
}
}""";

// Create destination index with normal settings
String destMapping = """
{
"properties": {
"text_field": {
"type": "text"
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";

// Create source index
assertAcked(
prepareCreate("source_index").setSettings(
Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true)
).setMapping(sourceMapping)
);

// Create destination index
assertAcked(prepareCreate("dest_index").setMapping(destMapping));

// Index test documents
int numDocs = randomIntBetween(100, 200);
final List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(
client().prepareIndex("source_index")
.setId(Integer.toString(i))
.setSource(
"text_field",
"text value " + i,
"keyword_field",
"key_" + i,
"numeric_field",
i,
"date_field",
System.currentTimeMillis()
)
);
}
indexRandom(true, docs);
refresh("source_index");

// Test 1: Basic reindex without slices
ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true);
BulkByScrollResponse response = reindex.get();
assertThat(response, matcher().created(numDocs));
verifyReindexedContent("dest_index", numDocs);

// Test 2: Reindex with query filter
String destFilteredIndex = "dest_filtered_index";
assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping));
reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(1));
verifyReindexedContent(destFilteredIndex, 1);

// Test 3: Reindex with slices
String destSlicedIndex = "dest_sliced_index";
assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping));
int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));
verifyReindexedContent(destSlicedIndex, numDocs);

// Test 4: Reindex with field transformation
String destTransformedIndex = "dest_transformed_index";
String transformedMapping = """
{
"properties": {
"new_text_field": {
"type": "text"
},
"new_keyword_field": {
"type": "keyword"
},
"modified_numeric": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";
assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping));

// First reindex the documents
reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs));

// Then transform using bulk update
BulkRequestBuilder bulkRequest = client().prepareBulk();
SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get();

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
Map<String, Object> newSource = new HashMap<>();

// Transform fields
newSource.put("new_text_field", source.get("text_field"));
newSource.put("new_keyword_field", source.get("keyword_field"));
newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000);
newSource.put("date_field", source.get("date_field"));

bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource));
}

BulkResponse bulkResponse = bulkRequest.get();
assertFalse(bulkResponse.hasFailures());
refresh(destTransformedIndex);
verifyTransformedContent(destTransformedIndex, numDocs);
}

private void verifyReindexedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("numeric_field", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("text_field"));
assertEquals("key_" + id, source.get("keyword_field"));
assertEquals(id, ((Number) source.get("numeric_field")).intValue());
assertNotNull(source.get("date_field"));
}
}

private void verifyTransformedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("modified_numeric", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("new_text_field"));
assertEquals("key_" + id, source.get("new_keyword_field"));
assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue());
assertNotNull(source.get("date_field"));
}
}
}
Loading
Loading