Skip to content

Commit dd710e6

Browse files
authored
Allowing non-rollup and rollup indices to be searched together (#1268)
* Allowing non-rollup and rollup indices to be searched together Signed-off-by: Kshitij Tandon <[email protected]> * Fixing an issue in the integration test Signed-off-by: Kshitij Tandon <[email protected]> * Using trace in place of warn in logger Signed-off-by: Kshitij Tandon <[email protected]> --------- Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 335bd4c commit dd710e6

File tree

6 files changed

+73
-11
lines changed

6 files changed

+73
-11
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
534534
RollupSettings.ROLLUP_SEARCH_ENABLED,
535535
RollupSettings.ROLLUP_DASHBOARDS,
536536
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
537+
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
537538
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
538539
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
539540
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,18 @@ class RollupInterceptor(
6363

6464
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)
6565

66+
@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)
67+
6668
init {
6769
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
6870
searchEnabled = it
6971
}
7072
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
7173
searchAllJobs = it
7274
}
75+
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) {
76+
searchRawRollupIndices = it
77+
}
7378
}
7479

7580
@Suppress("SpreadOperator")
@@ -144,15 +149,16 @@ class RollupInterceptor(
144149
private fun validateIndicies(concreteIndices: Array<String>, fieldMappings: Set<RollupFieldMapping>): Map<Rollup, Set<RollupFieldMapping>> {
145150
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
146151
for (concreteIndex in concreteIndices) {
147-
val rollupJobs =
148-
clusterService.state().metadata.index(concreteIndex).getRollupJobs()
149-
?: throw IllegalArgumentException("Not all indices have rollup job")
150-
151-
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
152-
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
153-
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
152+
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
153+
if (rollupJobs != null) {
154+
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
155+
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
156+
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
157+
}
158+
allMatchingRollupJobs += matchingRollupJobs
159+
} else if (!searchRawRollupIndices) {
160+
throw IllegalArgumentException("Not all indices have rollup job")
154161
}
155-
allMatchingRollupJobs += matchingRollupJobs
156162
}
157163
return allMatchingRollupJobs
158164
}
@@ -347,6 +353,9 @@ class RollupInterceptor(
347353
if (searchAllJobs) {
348354
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
349355
} else {
356+
if (matchingRollupJobs.keys.size > 1) {
357+
logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window")
358+
}
350359
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
351360
}
352361
}

src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class RollupSettings {
1313
companion object {
1414
const val DEFAULT_ROLLUP_ENABLED = true
1515
const val DEFAULT_SEARCH_ALL_JOBS = false
16+
const val DEFAULT_SEARCH_SOURCE_INDICES = false
1617
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
1718
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
1819
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
@@ -85,6 +86,14 @@ class RollupSettings {
8586
Setting.Property.Dynamic,
8687
)
8788

89+
val ROLLUP_SEARCH_SOURCE_INDICES: Setting<Boolean> =
90+
Setting.boolSetting(
91+
"plugins.rollup.search.search_source_indices",
92+
DEFAULT_SEARCH_SOURCE_INDICES,
93+
Setting.Property.NodeScope,
94+
Setting.Property.Dynamic,
95+
)
96+
8897
val ROLLUP_DASHBOARDS: Setting<Boolean> =
8998
Setting.boolSetting(
9099
"plugins.rollup.dashboards.enabled",

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
9191
RollupSettings.ROLLUP_ENABLED,
9292
RollupSettings.ROLLUP_SEARCH_ENABLED,
9393
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
94+
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
9495
RollupSettings.ROLLUP_DASHBOARDS,
9596
SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES,
9697
),
@@ -176,6 +177,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
176177
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
177178
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
178179
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
180+
assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false)
179181
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
180182
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
181183
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))

src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
269269
assertEquals("Request failed", RestStatus.OK, res.restStatus())
270270
}
271271

272+
protected fun updateSearchRawRollupClusterSetting(value: Boolean) {
273+
val formattedValue = "\"${value}\""
274+
val request =
275+
"""
276+
{
277+
"persistent": {
278+
"${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue
279+
}
280+
}
281+
""".trimIndent()
282+
val res =
283+
client().makeRequest(
284+
"PUT", "_cluster/settings", emptyMap(),
285+
StringEntity(request, ContentType.APPLICATION_JSON),
286+
)
287+
assertEquals("Request failed", RestStatus.OK, res.restStatus())
288+
}
289+
272290
protected fun createSampleIndexForQSQTest(index: String) {
273291
val mapping =
274292
"""

src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,12 +1092,12 @@ class RollupInterceptorIT : RollupRestTestCase() {
10921092
},
10931093
"aggs": {
10941094
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
1095-
"max_passenger_count": { "max": { "field": "passenger_count" } },
1096-
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
1095+
"max_passenger_count": { "max": { "field": "passenger_count" } }
10971096
}
10981097
}
10991098
""".trimIndent()
1100-
// Search 1 non-rollup index and 1 rollup
1099+
// Search 1 non-rollup index and 1 rollup
1100+
updateSearchRawRollupClusterSetting(false)
11011101
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
11021102
assertTrue(searchResult1.restStatus() == RestStatus.OK)
11031103
val failures = extractFailuresFromSearchResponse(searchResult1)
@@ -1112,6 +1112,29 @@ class RollupInterceptorIT : RollupRestTestCase() {
11121112
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response",
11131113
)
11141114

1115+
// Updating to allow searching on non-rollup and rolled-up index together
1116+
updateSearchRawRollupClusterSetting(true)
1117+
val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1118+
assertTrue(rawRes1.restStatus() == RestStatus.OK)
1119+
val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1120+
assertTrue(rawRes2.restStatus() == RestStatus.OK)
1121+
val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1122+
assertTrue(searchResult.restStatus() == RestStatus.OK)
1123+
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
1124+
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
1125+
val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map<String, Map<String, Any>>
1126+
1127+
val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
1128+
1129+
assertEquals(
1130+
"Searching single raw source index and rollup target index did not return the same sum results",
1131+
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"],
1132+
)
1133+
assertEquals(
1134+
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
1135+
trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"],
1136+
)
1137+
11151138
// Search 2 rollups with different mappings
11161139
try {
11171140
client().makeRequest(

0 commit comments

Comments
 (0)