Skip to content

Commit 268258a

Browse files
committed
Add remote download stats
1 parent 3db2525 commit 268258a

File tree

13 files changed

+296
-105
lines changed

13 files changed

+296
-105
lines changed

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,9 @@ public interface PersistedState extends Closeable {
651651
* Returns the stats for the persistence layer for {@link CoordinationState}.
652652
* @return PersistedStateStats
653653
*/
654-
PersistedStateStats getStats();
654+
PersistedStateStats getUploadStats();
655+
656+
PersistedStateStats getDownloadStats();
655657

656658
/**
657659
* Marks the last accepted cluster state as committed.

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -896,9 +896,13 @@ public DiscoveryStats stats() {
896896
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
897897
ArrayList<PersistedStateStats> stats = new ArrayList<>();
898898
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
899-
if (persistedStateRegistry.getPersistedState(stateType) != null
900-
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
901-
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
899+
if (persistedStateRegistry.getPersistedState(stateType) != null) {
900+
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
901+
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
902+
}
903+
if (coordinationState.get().isRemotePublicationEnabled()) {
904+
stats.add(publicationHandler.getDownloadStats());
905+
}
902906
}
903907
});
904908
clusterStateStats.setPersistenceStats(stats);

server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ public void setLastAcceptedState(ClusterState clusterState) {
6666
}
6767

6868
@Override
69-
public PersistedStateStats getStats() {
69+
public PersistedStateStats getUploadStats() {
70+
return null;
71+
}
72+
73+
@Override
74+
public PersistedStateStats getDownloadStats() {
7075
return null;
7176
}
7277

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 72 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ public PublishClusterStateStats stats() {
176176
);
177177
}
178178

179+
public PersistedStateStats getDownloadStats() {
180+
return remoteClusterStateService.getDownloadStats();
181+
}
182+
179183
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
180184
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
181185
ClusterState incomingState;
@@ -229,69 +233,75 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
229233
}
230234

231235
// package private for testing
232-
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
233-
if (transportService.getLocalNode().equals(request.getSourceNode())) {
234-
return acceptRemoteStateOnLocalNode(request);
235-
}
236-
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
237-
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
238-
request.getClusterUUID(),
239-
request.getManifestFile()
240-
);
241-
if (manifest == null) {
242-
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
243-
}
244-
boolean applyFullState = false;
245-
final ClusterState lastSeen = lastSeenClusterState.get();
246-
if (lastSeen == null) {
247-
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
248-
applyFullState = true;
249-
} else if (manifest.getDiffManifest() == null) {
250-
logger.trace(() -> "There is no diff in the manifest");
251-
applyFullState = true;
252-
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
253-
logger.debug(() -> "Last cluster state not compatible with the diff");
254-
applyFullState = true;
255-
}
256-
257-
if (applyFullState == true) {
258-
logger.debug(
259-
() -> new ParameterizedMessage(
260-
"Downloading full cluster state for term {}, version {}, stateUUID {}",
261-
manifest.getClusterTerm(),
262-
manifest.getStateVersion(),
263-
manifest.getStateUUID()
264-
)
265-
);
266-
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
267-
request.getClusterName(),
268-
manifest,
269-
transportService.getLocalNode().getId(),
270-
true
271-
);
272-
fullClusterStateReceivedCount.incrementAndGet();
273-
final PublishWithJoinResponse response = acceptState(clusterState);
274-
lastSeenClusterState.set(clusterState);
275-
return response;
276-
} else {
277-
logger.debug(
278-
() -> new ParameterizedMessage(
279-
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
280-
manifest.getClusterTerm(),
281-
manifest.getStateVersion(),
282-
manifest.getDiffManifest().getFromStateUUID(),
283-
manifest.getStateUUID()
284-
)
285-
);
286-
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
287-
manifest,
288-
lastSeen,
289-
transportService.getLocalNode().getId()
236+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
237+
try {
238+
if (transportService.getLocalNode().equals(request.getSourceNode())) {
239+
return acceptRemoteStateOnLocalNode(request);
240+
}
241+
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
242+
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
243+
request.getClusterUUID(),
244+
request.getManifestFile()
290245
);
291-
compatibleClusterStateDiffReceivedCount.incrementAndGet();
292-
final PublishWithJoinResponse response = acceptState(clusterState);
293-
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
294-
return response;
246+
if (manifest == null) {
247+
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
248+
}
249+
boolean applyFullState = false;
250+
final ClusterState lastSeen = lastSeenClusterState.get();
251+
if (lastSeen == null) {
252+
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
253+
applyFullState = true;
254+
} else if (manifest.getDiffManifest() == null) {
255+
logger.debug(() -> "There is no diff in the manifest");
256+
applyFullState = true;
257+
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
258+
logger.debug(() -> "Last cluster state not compatible with the diff");
259+
applyFullState = true;
260+
}
261+
262+
if (applyFullState == true) {
263+
logger.debug(
264+
() -> new ParameterizedMessage(
265+
"Downloading full cluster state for term {}, version {}, stateUUID {}",
266+
manifest.getClusterTerm(),
267+
manifest.getStateVersion(),
268+
manifest.getStateUUID()
269+
)
270+
);
271+
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
272+
request.getClusterName(),
273+
manifest,
274+
transportService.getLocalNode().getId(),
275+
true
276+
);
277+
fullClusterStateReceivedCount.incrementAndGet();
278+
final PublishWithJoinResponse response = acceptState(clusterState);
279+
lastSeenClusterState.set(clusterState);
280+
return response;
281+
} else {
282+
logger.debug(
283+
() -> new ParameterizedMessage(
284+
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
285+
manifest.getClusterTerm(),
286+
manifest.getStateVersion(),
287+
manifest.getDiffManifest().getFromStateUUID(),
288+
manifest.getStateUUID()
289+
)
290+
);
291+
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
292+
manifest,
293+
lastSeen,
294+
transportService.getLocalNode().getId()
295+
);
296+
compatibleClusterStateDiffReceivedCount.incrementAndGet();
297+
final PublishWithJoinResponse response = acceptState(clusterState);
298+
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
299+
return response;
300+
}
301+
} catch (Exception e) {
302+
remoteClusterStateService.readMetadataFailed();
303+
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
304+
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
295305
}
296306
}
297307

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,11 +618,16 @@ public void setLastAcceptedState(ClusterState clusterState) {
618618
}
619619

620620
@Override
621-
public PersistedStateStats getStats() {
621+
public PersistedStateStats getUploadStats() {
622622
// Note: These stats are not published yet, will come in future
623623
return null;
624624
}
625625

626+
@Override
627+
public PersistedStateStats getDownloadStats() {
628+
return null;
629+
}
630+
626631
private PersistedClusterStateService.Writer getWriterSafe() {
627632
final PersistedClusterStateService.Writer writer = persistenceWriter.get();
628633
if (writer == null) {
@@ -740,8 +745,13 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
740745
}
741746

742747
@Override
743-
public PersistedStateStats getStats() {
744-
return remoteClusterStateService.getStats();
748+
public PersistedStateStats getUploadStats() {
749+
return remoteClusterStateService.getStats().getUploadStats();
750+
}
751+
752+
@Override
753+
public PersistedStateStats getDownloadStats() {
754+
return remoteClusterStateService.getStats().getDownloadStats();
745755
}
746756

747757
private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.cluster.DiffableUtils;
1919
import org.opensearch.cluster.block.ClusterBlocks;
2020
import org.opensearch.cluster.coordination.CoordinationMetadata;
21+
import org.opensearch.cluster.coordination.PersistedStateStats;
2122
import org.opensearch.cluster.metadata.DiffableStringMap;
2223
import org.opensearch.cluster.metadata.IndexMetadata;
2324
import org.opensearch.cluster.metadata.Metadata;
@@ -256,8 +257,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
256257
);
257258

258259
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
259-
remoteStateStats.stateSucceeded();
260-
remoteStateStats.stateTook(durationMillis);
260+
remoteStateStats.stateUploadSucceeded();
261+
remoteStateStats.stateUploadTook(durationMillis);
261262
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
262263
logger.warn(
263264
"writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
@@ -447,8 +448,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
447448
);
448449

449450
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
450-
remoteStateStats.stateSucceeded();
451-
remoteStateStats.stateTook(durationMillis);
451+
remoteStateStats.stateUploadSucceeded();
452+
remoteStateStats.stateUploadTook(durationMillis);
452453
ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage(
453454
CLUSTER_STATE_UPLOAD_TIME_LOG_STRING,
454455
manifestDetails.getClusterMetadataManifest().getStateVersion(),
@@ -1311,8 +1312,10 @@ public ClusterState getClusterStateForManifest(
13111312
String localNodeId,
13121313
boolean includeEphemeral
13131314
) throws IOException {
1315+
final ClusterState clusterState;
1316+
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
13141317
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
1315-
return readClusterStateInParallel(
1318+
clusterState = readClusterStateInParallel(
13161319
ClusterState.builder(new ClusterName(clusterName)).build(),
13171320
manifest,
13181321
manifest.getClusterUUID(),
@@ -1332,7 +1335,7 @@ public ClusterState getClusterStateForManifest(
13321335
includeEphemeral
13331336
);
13341337
} else {
1335-
ClusterState clusterState = readClusterStateInParallel(
1338+
ClusterState state = readClusterStateInParallel(
13361339
ClusterState.builder(new ClusterName(clusterName)).build(),
13371340
manifest,
13381341
manifest.getClusterUUID(),
@@ -1353,15 +1356,20 @@ public ClusterState getClusterStateForManifest(
13531356
false
13541357
);
13551358
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
1356-
mb.indices(clusterState.metadata().indices());
1357-
return ClusterState.builder(clusterState).metadata(mb).build();
1359+
mb.indices(state.metadata().indices());
1360+
clusterState = ClusterState.builder(state).metadata(mb).build();
13581361
}
1362+
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
1363+
remoteStateStats.stateDownloadSucceeded();
1364+
remoteStateStats.stateDownloadTook(durationMillis);
13591365

1366+
return clusterState;
13601367
}
13611368

13621369
public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
13631370
throws IOException {
13641371
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
1372+
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
13651373
ClusterStateDiffManifest diff = manifest.getDiffManifest();
13661374
List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
13671375
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
@@ -1437,11 +1445,17 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
14371445
indexRoutingTables.remove(indexName);
14381446
}
14391447

1440-
return clusterStateBuilder.stateUUID(manifest.getStateUUID())
1448+
ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
14411449
.version(manifest.getStateVersion())
14421450
.metadata(metadataBuilder)
14431451
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
14441452
.build();
1453+
1454+
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
1455+
remoteStateStats.stateDownloadSucceeded();
1456+
remoteStateStats.stateDownloadTook(durationMillis);
1457+
1458+
return clusterState;
14451459
}
14461460

14471461
/**
@@ -1637,10 +1651,22 @@ void setRemoteClusterStateAttributesManager(RemoteClusterStateAttributesManager
16371651
}
16381652

16391653
public void writeMetadataFailed() {
1640-
getStats().stateFailed();
1654+
getStats().stateUploadFailed();
16411655
}
16421656

16431657
public RemotePersistenceStats getStats() {
16441658
return remoteStateStats;
16451659
}
1660+
1661+
public void readMetadataFailed() {
1662+
getStats().stateDownloadFailed();
1663+
}
1664+
1665+
public PersistedStateStats getUploadStats() {
1666+
return remoteStateStats.getUploadStats();
1667+
}
1668+
1669+
public PersistedStateStats getDownloadStats() {
1670+
return remoteStateStats.getDownloadStats();
1671+
}
16461672
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.cluster.coordination.PersistedStateStats;
12+
13+
import java.util.concurrent.atomic.AtomicLong;
14+
15+
public class RemoteDownloadStats extends PersistedStateStats {
16+
static final String TIMEOUT_COUNT = "timeout_count";
17+
static final String REMOTE_DOWNLOAD = "remote_download";
18+
private AtomicLong timeoutCount = new AtomicLong(0);
19+
20+
public RemoteDownloadStats() {
21+
super(REMOTE_DOWNLOAD);
22+
addToExtendedFields(TIMEOUT_COUNT, timeoutCount);
23+
}
24+
25+
public void stateDownloadTimeout() {
26+
timeoutCount.incrementAndGet();
27+
}
28+
29+
public long getStateDownloadTimeout() {
30+
return timeoutCount.get();
31+
}
32+
}

0 commit comments

Comments
 (0)