Skip to content

Commit 38d0a82

Browse files
committed
[Remote Publication] Add remote download stats (opensearch-project#15291)
--------- Signed-off-by: Arpit Bandejiya <[email protected]> (cherry picked from commit b54e867)
1 parent 5653ed6 commit 38d0a82

File tree

15 files changed

+416
-122
lines changed

15 files changed

+416
-122
lines changed

CHANGELOG.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
3636
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
3737
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
38+
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
3839

3940
### Dependencies
4041
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
41-
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
42+
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.16.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861), [#15205](https://github.com/opensearch-project/OpenSearch/pull/15205))
4243
- OpenJDK Update (July 2024 Patch releases) ([#14998](https://github.com/opensearch-project/OpenSearch/pull/14998))
4344
- Bump `com.microsoft.azure:msal4j` from 1.16.1 to 1.17.0 ([#14995](https://github.com/opensearch-project/OpenSearch/pull/14995), [#15420](https://github.com/opensearch-project/OpenSearch/pull/15420))
4445
- Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997))
4546
- Bump `org.tukaani:xz` from 1.9 to 1.10 ([#15110](https://github.com/opensearch-project/OpenSearch/pull/15110))
47+
- Bump `actions/setup-java` from 1 to 4 ([#15104](https://github.com/opensearch-project/OpenSearch/pull/15104))
4648
- Bump `org.apache.avro:avro` from 1.11.3 to 1.12.0 in /plugins/repository-hdfs ([#15119](https://github.com/opensearch-project/OpenSearch/pull/15119))
4749
- Bump `org.bouncycastle:bcpg-fips` from 1.0.7.1 to 2.0.9 ([#15103](https://github.com/opensearch-project/OpenSearch/pull/15103), [#15299](https://github.com/opensearch-project/OpenSearch/pull/15299))
4850
- Bump `com.azure:azure-core` from 1.49.1 to 1.51.0 ([#15111](https://github.com/opensearch-project/OpenSearch/pull/15111))
@@ -81,13 +83,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8183
- Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))
8284
- Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126))
8385
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
86+
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
8487
- Fix incorrect parameter names in MinHash token filter configuration handling ([#15233](https://github.com/opensearch-project/OpenSearch/pull/15233))
85-
- Fix range aggregation optimization ignoring top level queries ([#15287](https://github.com/opensearch-project/OpenSearch/pull/15287))
8688
- Fix indexing error when flat_object field is explicitly null ([#15375](https://github.com/opensearch-project/OpenSearch/pull/15375))
8789
- Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393))
8890
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))
8991
- Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069))
9092

9193
### Security
9294

93-
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.16...2.x
95+
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.15...2.x

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Base64;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Objects;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicLong;
3637

@@ -40,6 +41,7 @@
4041
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
4142
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
4243
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
44+
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
4345
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
4446
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
4547
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
253255
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
254256
assertNotNull(discoveryStats.getClusterStateStats());
255257
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
256-
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
257-
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
258-
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
259-
.get();
260-
assertEquals(0, cleanupAttemptFailedCount);
258+
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
259+
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
260+
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
261+
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
262+
.get();
263+
assertEquals(0, cleanupAttemptFailedCount);
264+
}
261265
}
262266
}
263267

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
package org.opensearch.gateway.remote;
1010

11+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
12+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1113
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1214
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1315
import org.opensearch.client.Client;
1416
import org.opensearch.common.blobstore.BlobPath;
1517
import org.opensearch.common.settings.Settings;
1618
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.discovery.DiscoveryStats;
1720
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
1821
import org.opensearch.indices.recovery.RecoverySettings;
1922
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
155158
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
156159
}
157160

161+
public void testRemotePublicationDownloadStats() {
162+
int shardCount = randomIntBetween(1, 2);
163+
int replicaCount = 1;
164+
int dataNodeCount = shardCount * (replicaCount + 1);
165+
int clusterManagerNodeCount = 1;
166+
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
167+
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);
168+
169+
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
170+
.cluster()
171+
.prepareNodesStats(dataNode)
172+
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
173+
.get();
174+
175+
assertDataNodeDownloadStats(nodesStatsResponseDataNode);
176+
177+
}
178+
179+
private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
180+
// assert cluster state stats for data node
181+
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
182+
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
183+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
184+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
185+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
186+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);
187+
188+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
189+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
190+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
191+
}
192+
158193
private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
159194
BlobPath metadataPath = repository.basePath()
160195
.add(

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,10 @@ public DiscoveryStats stats() {
902902
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
903903
}
904904
});
905+
if (coordinationState.get().isRemotePublicationEnabled()) {
906+
stats.add(publicationHandler.getFullDownloadStats());
907+
stats.add(publicationHandler.getDiffDownloadStats());
908+
}
905909
clusterStateStats.setPersistenceStats(stats);
906910
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
907911
}

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 78 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ public PublishClusterStateStats stats() {
178178
);
179179
}
180180

181+
public PersistedStateStats getFullDownloadStats() {
182+
return remoteClusterStateService.getFullDownloadStats();
183+
}
184+
185+
public PersistedStateStats getDiffDownloadStats() {
186+
return remoteClusterStateService.getDiffDownloadStats();
187+
}
188+
181189
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
182190
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
183191
ClusterState incomingState;
@@ -231,69 +239,78 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
231239
}
232240

233241
// package private for testing
234-
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
235-
if (transportService.getLocalNode().equals(request.getSourceNode())) {
236-
return acceptRemoteStateOnLocalNode(request);
237-
}
238-
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
239-
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
240-
request.getClusterUUID(),
241-
request.getManifestFile()
242-
);
243-
if (manifest == null) {
244-
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
245-
}
242+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
246243
boolean applyFullState = false;
247-
final ClusterState lastSeen = lastSeenClusterState.get();
248-
if (lastSeen == null) {
249-
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
250-
applyFullState = true;
251-
} else if (manifest.getDiffManifest() == null) {
252-
logger.trace(() -> "There is no diff in the manifest");
253-
applyFullState = true;
254-
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
255-
logger.debug(() -> "Last cluster state not compatible with the diff");
256-
applyFullState = true;
257-
}
258-
259-
if (applyFullState == true) {
260-
logger.debug(
261-
() -> new ParameterizedMessage(
262-
"Downloading full cluster state for term {}, version {}, stateUUID {}",
263-
manifest.getClusterTerm(),
264-
manifest.getStateVersion(),
265-
manifest.getStateUUID()
266-
)
267-
);
268-
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
269-
request.getClusterName(),
270-
manifest,
271-
transportService.getLocalNode().getId(),
272-
true
273-
);
274-
fullClusterStateReceivedCount.incrementAndGet();
275-
final PublishWithJoinResponse response = acceptState(clusterState);
276-
lastSeenClusterState.set(clusterState);
277-
return response;
278-
} else {
279-
logger.debug(
280-
() -> new ParameterizedMessage(
281-
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
282-
manifest.getClusterTerm(),
283-
manifest.getStateVersion(),
284-
manifest.getDiffManifest().getFromStateUUID(),
285-
manifest.getStateUUID()
286-
)
287-
);
288-
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
289-
manifest,
290-
lastSeen,
291-
transportService.getLocalNode().getId()
244+
try {
245+
if (transportService.getLocalNode().equals(request.getSourceNode())) {
246+
return acceptRemoteStateOnLocalNode(request);
247+
}
248+
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
249+
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
250+
request.getClusterUUID(),
251+
request.getManifestFile()
292252
);
293-
compatibleClusterStateDiffReceivedCount.incrementAndGet();
294-
final PublishWithJoinResponse response = acceptState(clusterState);
295-
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
296-
return response;
253+
if (manifest == null) {
254+
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
255+
}
256+
final ClusterState lastSeen = lastSeenClusterState.get();
257+
if (lastSeen == null) {
258+
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
259+
applyFullState = true;
260+
} else if (manifest.getDiffManifest() == null) {
261+
logger.debug(() -> "There is no diff in the manifest");
262+
applyFullState = true;
263+
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
264+
logger.debug(() -> "Last cluster state not compatible with the diff");
265+
applyFullState = true;
266+
}
267+
268+
if (applyFullState == true) {
269+
logger.debug(
270+
() -> new ParameterizedMessage(
271+
"Downloading full cluster state for term {}, version {}, stateUUID {}",
272+
manifest.getClusterTerm(),
273+
manifest.getStateVersion(),
274+
manifest.getStateUUID()
275+
)
276+
);
277+
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
278+
request.getClusterName(),
279+
manifest,
280+
transportService.getLocalNode().getId(),
281+
true
282+
);
283+
fullClusterStateReceivedCount.incrementAndGet();
284+
final PublishWithJoinResponse response = acceptState(clusterState);
285+
lastSeenClusterState.set(clusterState);
286+
return response;
287+
} else {
288+
logger.debug(
289+
() -> new ParameterizedMessage(
290+
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
291+
manifest.getClusterTerm(),
292+
manifest.getStateVersion(),
293+
manifest.getDiffManifest().getFromStateUUID(),
294+
manifest.getStateUUID()
295+
)
296+
);
297+
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
298+
manifest,
299+
lastSeen,
300+
transportService.getLocalNode().getId()
301+
);
302+
compatibleClusterStateDiffReceivedCount.incrementAndGet();
303+
final PublishWithJoinResponse response = acceptState(clusterState);
304+
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
305+
return response;
306+
}
307+
} catch (Exception e) {
308+
if (applyFullState) {
309+
remoteClusterStateService.fullDownloadFailed();
310+
} else {
311+
remoteClusterStateService.diffDownloadFailed();
312+
}
313+
throw e;
297314
}
298315
}
299316

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
747747

748748
@Override
749749
public PersistedStateStats getStats() {
750-
return remoteClusterStateService.getStats();
750+
return remoteClusterStateService.getUploadStats();
751751
}
752752

753753
private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public RemoteClusterStateCleanupManager(
8181
RemoteRoutingTableService remoteRoutingTableService
8282
) {
8383
this.remoteClusterStateService = remoteClusterStateService;
84-
this.remoteStateStats = remoteClusterStateService.getStats();
84+
this.remoteStateStats = remoteClusterStateService.getRemoteStateStats();
8585
ClusterSettings clusterSettings = clusterService.getClusterSettings();
8686
this.clusterApplierService = clusterService.getClusterApplierService();
8787
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);

0 commit comments

Comments
 (0)