Skip to content

Commit 0f9f29e

Browse files
committed
Allowing non-rollup and rollup indices to be searched together (#1268)
* Allowing non-rollup and rollup indices to be searched together Signed-off-by: Kshitij Tandon <[email protected]> * Fixing an issue in the integration test Signed-off-by: Kshitij Tandon <[email protected]> * Using trace in place of warn in logger Signed-off-by: Kshitij Tandon <[email protected]> --------- Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 5e3f9ef commit 0f9f29e

File tree

6 files changed

+80
-16
lines changed

6 files changed

+80
-16
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
527527
RollupSettings.ROLLUP_SEARCH_ENABLED,
528528
RollupSettings.ROLLUP_DASHBOARDS,
529529
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
530+
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
530531
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
531532
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
532533
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,18 @@ class RollupInterceptor(
6464

6565
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)
6666

67+
@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)
68+
6769
init {
6870
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
6971
searchEnabled = it
7072
}
7173
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
7274
searchAllJobs = it
7375
}
76+
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) {
77+
searchRawRollupIndices = it
78+
}
7479
}
7580

7681
@Suppress("SpreadOperator")
@@ -143,13 +148,15 @@ class RollupInterceptor(
143148
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
144149
for (concreteIndex in concreteIndices) {
145150
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
146-
?: throw IllegalArgumentException("Not all indices have rollup job")
147-
148-
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
149-
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
150-
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
151+
if (rollupJobs != null) {
152+
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
153+
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
154+
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
155+
}
156+
allMatchingRollupJobs += matchingRollupJobs
157+
} else if (!searchRawRollupIndices) {
158+
throw IllegalArgumentException("Not all indices have rollup job")
151159
}
152-
allMatchingRollupJobs += matchingRollupJobs
153160
}
154161
return allMatchingRollupJobs
155162
}
@@ -342,6 +349,9 @@ class RollupInterceptor(
342349
if (searchAllJobs) {
343350
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
344351
} else {
352+
if (matchingRollupJobs.keys.size > 1) {
353+
logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window")
354+
}
345355
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
346356
}
347357
}

src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class RollupSettings {
1414
companion object {
1515
const val DEFAULT_ROLLUP_ENABLED = true
1616
const val DEFAULT_SEARCH_ALL_JOBS = false
17+
const val DEFAULT_SEARCH_SOURCE_INDICES = false
1718
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
1819
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
1920
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
@@ -78,11 +79,20 @@ class RollupSettings {
7879
Setting.Property.Dynamic,
7980
)
8081

81-
val ROLLUP_DASHBOARDS: Setting<Boolean> = Setting.boolSetting(
82-
"plugins.rollup.dashboards.enabled",
83-
LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS,
84-
Setting.Property.NodeScope,
85-
Setting.Property.Dynamic,
86-
)
82+
val ROLLUP_SEARCH_SOURCE_INDICES: Setting<Boolean> =
83+
Setting.boolSetting(
84+
"plugins.rollup.search.search_source_indices",
85+
DEFAULT_SEARCH_SOURCE_INDICES,
86+
Setting.Property.NodeScope,
87+
Setting.Property.Dynamic,
88+
)
89+
90+
val ROLLUP_DASHBOARDS: Setting<Boolean> =
91+
Setting.boolSetting(
92+
"plugins.rollup.dashboards.enabled",
93+
LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS,
94+
Setting.Property.NodeScope,
95+
Setting.Property.Dynamic,
96+
)
8797
}
8898
}

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
9494
RollupSettings.ROLLUP_ENABLED,
9595
RollupSettings.ROLLUP_SEARCH_ENABLED,
9696
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
97+
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
9798
RollupSettings.ROLLUP_DASHBOARDS,
9899
SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES,
99100
),
@@ -177,6 +178,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
177178
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
178179
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
179180
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
181+
assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false)
180182
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
181183
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
182184
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))

src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
263263
assertEquals("Request failed", RestStatus.OK, res.restStatus())
264264
}
265265

266+
protected fun updateSearchRawRollupClusterSetting(value: Boolean) {
267+
val formattedValue = "\"${value}\""
268+
val request =
269+
"""
270+
{
271+
"persistent": {
272+
"${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue
273+
}
274+
}
275+
""".trimIndent()
276+
val res =
277+
client().makeRequest(
278+
"PUT", "_cluster/settings", emptyMap(),
279+
StringEntity(request, ContentType.APPLICATION_JSON),
280+
)
281+
assertEquals("Request failed", RestStatus.OK, res.restStatus())
282+
}
283+
266284
protected fun createSampleIndexForQSQTest(index: String) {
267285
val mapping = """
268286
"properties": {

src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,12 +1040,12 @@ class RollupInterceptorIT : RollupRestTestCase() {
10401040
},
10411041
"aggs": {
10421042
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
1043-
"max_passenger_count": { "max": { "field": "passenger_count" } },
1044-
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
1043+
"max_passenger_count": { "max": { "field": "passenger_count" } }
10451044
}
10461045
}
1047-
""".trimIndent()
1048-
// Search 1 non-rollup index and 1 rollup
1046+
""".trimIndent()
1047+
// Search 1 non-rollup index and 1 rollup
1048+
updateSearchRawRollupClusterSetting(false)
10491049
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
10501050
assertTrue(searchResult1.restStatus() == RestStatus.OK)
10511051
val failures = extractFailuresFromSearchResponse(searchResult1)
@@ -1061,6 +1061,29 @@ class RollupInterceptorIT : RollupRestTestCase() {
10611061
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response",
10621062
)
10631063

1064+
// Updating to allow searching on non-rollup and rolled-up index together
1065+
updateSearchRawRollupClusterSetting(true)
1066+
val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1067+
assertTrue(rawRes1.restStatus() == RestStatus.OK)
1068+
val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1069+
assertTrue(rawRes2.restStatus() == RestStatus.OK)
1070+
val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1071+
assertTrue(searchResult.restStatus() == RestStatus.OK)
1072+
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
1073+
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
1074+
val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map<String, Map<String, Any>>
1075+
1076+
val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double
1077+
1078+
assertEquals(
1079+
"Searching single raw source index and rollup target index did not return the same sum results",
1080+
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"],
1081+
)
1082+
assertEquals(
1083+
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
1084+
trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"],
1085+
)
1086+
10641087
// Search 2 rollups with different mappings
10651088
try {
10661089
client().makeRequest(

0 commit comments

Comments
 (0)