Skip to content

Commit b9125bf

Browse files
committed
Add tests and address comments
Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 50c5d7b commit b9125bf

File tree

16 files changed

+219
-97
lines changed

16 files changed

+219
-97
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
4141
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
4242
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
43+
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
4344

4445
### Dependencies
4546
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Base64;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.Objects;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicLong;
3637

@@ -40,6 +41,7 @@
4041
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
4142
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
4243
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
44+
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
4345
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
4446
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
4547
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -253,10 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
253255
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
254256
assertNotNull(discoveryStats.getClusterStateStats());
255257
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
256-
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
257-
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
258-
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT).get();
259-
assertEquals(0, cleanupAttemptFailedCount);
258+
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
259+
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
260+
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
261+
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
262+
.get();
263+
assertEquals(0, cleanupAttemptFailedCount);
264+
}
260265
}
261266
}
262267

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
package org.opensearch.gateway.remote;
1010

11+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
12+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1113
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1214
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1315
import org.opensearch.client.Client;
1416
import org.opensearch.common.blobstore.BlobPath;
1517
import org.opensearch.common.settings.Settings;
1618
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.discovery.DiscoveryStats;
1720
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
1821
import org.opensearch.indices.recovery.RecoverySettings;
1922
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
155158
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
156159
}
157160

161+
public void testRemotePublicationDownloadStats() {
162+
int shardCount = randomIntBetween(1, 2);
163+
int replicaCount = 1;
164+
int dataNodeCount = shardCount * (replicaCount + 1);
165+
int clusterManagerNodeCount = 1;
166+
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
167+
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);
168+
169+
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
170+
.cluster()
171+
.prepareNodesStats(dataNode)
172+
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
173+
.get();
174+
175+
assertDataNodeDownloadStats(nodesStatsResponseDataNode);
176+
177+
}
178+
179+
private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
180+
// assert cluster state stats for data node
181+
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
182+
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
183+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
184+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
185+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
186+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);
187+
188+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
189+
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
190+
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
191+
}
192+
158193
private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
159194
BlobPath metadataPath = repository.basePath()
160195
.add(

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ public interface PersistedState extends Closeable {
652652
* Returns the stats for the persistence layer for {@link CoordinationState}.
653653
* @return PersistedStateStats
654654
*/
655-
PersistedStateStats getUploadStats();
655+
PersistedStateStats getStats();
656656

657657
/**
658658
* Marks the last accepted cluster state as committed.

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -896,14 +896,14 @@ public DiscoveryStats stats() {
896896
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
897897
ArrayList<PersistedStateStats> stats = new ArrayList<>();
898898
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
899-
if (persistedStateRegistry.getPersistedState(stateType) != null) {
900-
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
901-
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
902-
}
899+
if (persistedStateRegistry.getPersistedState(stateType) != null
900+
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
901+
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
903902
}
904903
});
905904
if (coordinationState.get().isRemotePublicationEnabled()) {
906-
stats.add(publicationHandler.getDownloadStats());
905+
stats.add(publicationHandler.getFullDownloadStats());
906+
stats.add(publicationHandler.getDiffDownloadStats());
907907
}
908908
clusterStateStats.setPersistenceStats(stats);
909909
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);

server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
6666
}
6767

6868
@Override
69-
public PersistedStateStats getUploadStats() {
69+
public PersistedStateStats getStats() {
7070
return null;
7171
}
7272

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,12 @@ public PublishClusterStateStats stats() {
178178
);
179179
}
180180

181-
public PersistedStateStats getDownloadStats() {
182-
return remoteClusterStateService.getDownloadStats();
181+
public PersistedStateStats getFullDownloadStats() {
182+
return remoteClusterStateService.getFullDownloadStats();
183+
}
184+
185+
public PersistedStateStats getDiffDownloadStats() {
186+
return remoteClusterStateService.getDiffDownloadStats();
183187
}
184188

185189
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
@@ -235,7 +239,8 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
235239
}
236240

237241
// package private for testing
238-
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
242+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
243+
boolean applyFullState = false;
239244
try {
240245
if (transportService.getLocalNode().equals(request.getSourceNode())) {
241246
return acceptRemoteStateOnLocalNode(request);
@@ -248,7 +253,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
248253
if (manifest == null) {
249254
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
250255
}
251-
boolean applyFullState = false;
252256
final ClusterState lastSeen = lastSeenClusterState.get();
253257
if (lastSeen == null) {
254258
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
@@ -262,7 +266,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
262266
}
263267

264268
if (applyFullState == true) {
265-
remoteClusterStateService.fullDownloadState();
266269
logger.debug(
267270
() -> new ParameterizedMessage(
268271
"Downloading full cluster state for term {}, version {}, stateUUID {}",
@@ -282,7 +285,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
282285
lastSeenClusterState.set(clusterState);
283286
return response;
284287
} else {
285-
remoteClusterStateService.diffDownloadState();
286288
logger.debug(
287289
() -> new ParameterizedMessage(
288290
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
@@ -303,9 +305,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
303305
return response;
304306
}
305307
} catch (Exception e) {
306-
remoteClusterStateService.readMetadataFailed();
307-
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
308-
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
308+
if (applyFullState) {
309+
remoteClusterStateService.fullDownloadFailed();
310+
} else {
311+
remoteClusterStateService.diffDownloadFailed();
312+
}
313+
throw e;
309314
}
310315
}
311316

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
618618
}
619619

620620
@Override
621-
public PersistedStateStats getUploadStats() {
621+
public PersistedStateStats getStats() {
622622
// Note: These stats are not published yet, will come in future
623623
return null;
624624
}
@@ -745,7 +745,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
745745
}
746746

747747
@Override
748-
public PersistedStateStats getUploadStats() {
748+
public PersistedStateStats getStats() {
749749
return remoteClusterStateService.getUploadStats();
750750
}
751751

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,8 +1364,8 @@ public ClusterState getClusterStateForManifest(
13641364
clusterState = ClusterState.builder(state).metadata(mb).build();
13651365
}
13661366
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
1367-
remoteStateStats.stateDownloadSucceeded();
1368-
remoteStateStats.stateDownloadTook(durationMillis);
1367+
remoteStateStats.stateFullDownloadSucceeded();
1368+
remoteStateStats.stateFullDownloadTook(durationMillis);
13691369

13701370
return clusterState;
13711371
}
@@ -1456,8 +1456,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
14561456
.build();
14571457

14581458
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
1459-
remoteStateStats.stateDownloadSucceeded();
1460-
remoteStateStats.stateDownloadTook(durationMillis);
1459+
remoteStateStats.stateDiffDownloadSucceeded();
1460+
remoteStateStats.stateDiffDownloadTook(durationMillis);
14611461

14621462
return clusterState;
14631463
}
@@ -1658,27 +1658,27 @@ public void writeMetadataFailed() {
16581658
remoteStateStats.stateUploadFailed();
16591659
}
16601660

1661-
public void readMetadataFailed() {
1662-
remoteStateStats.stateDownloadFailed();
1661+
public RemotePersistenceStats getRemoteStateStats() {
1662+
return remoteStateStats;
16631663
}
16641664

1665-
public void fullDownloadState() {
1666-
remoteStateStats.fullDownloadState();
1665+
public PersistedStateStats getUploadStats() {
1666+
return remoteStateStats.getUploadStats();
16671667
}
16681668

1669-
public void diffDownloadState() {
1670-
remoteStateStats.diffDownloadState();
1669+
public PersistedStateStats getFullDownloadStats() {
1670+
return remoteStateStats.getRemoteFullDownloadStats();
16711671
}
16721672

1673-
public RemotePersistenceStats getRemoteStateStats() {
1674-
return remoteStateStats;
1673+
public PersistedStateStats getDiffDownloadStats() {
1674+
return remoteStateStats.getRemoteDiffDownloadStats();
16751675
}
16761676

1677-
public PersistedStateStats getUploadStats() {
1678-
return remoteStateStats.getUploadStats();
1677+
public void fullDownloadFailed() {
1678+
remoteStateStats.stateFullDownloadFailed();
16791679
}
16801680

1681-
public PersistedStateStats getDownloadStats() {
1682-
return remoteStateStats.getDownloadStats();
1681+
public void diffDownloadFailed() {
1682+
remoteStateStats.stateDiffDownloadFailed();
16831683
}
16841684
}

server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

0 commit comments

Comments
 (0)