Skip to content

[BUG] Generic threads exhausted when ongoing concurrent node recoveries is higher than threadpool size  #14768

@ashking94

Description

@ashking94

Describe the bug

The number of concurrent recoveries (both ongoing and outgoing) that can happen on a node is controlled using the setting cluster.routing.allocation.node_concurrent_recoveries. As of today, the peer recovery process uses Generic threadpool on both the RecoverySourceHandler and RecoveryTarget. While testing with high value of cluster.routing.allocation.node_concurrent_recoveries, ran into an issue where all the 128 generic threads were in WAITING state. The thread dump can be referred below. This happened due to cyclic dependency of recovery process submitting task asynchronously to the same generic threadpool. The thread after submitting the task keeps on waiting for the future.get() to return. This poses a scenario where higher concurrent node recoveries can lead to cluster going in a limbo state since generic threadpool is exhausted and will get freed up only when it gets a generic thread to run the task. This is kind of deadlock scenario. In this case, the issue manifested as node considering itself part of the cluster while the active cluster manager did not consider the same on the node that saw the generic threadpool getting exhausted.

Thread dump

showing generic thread in WAITING state -

"opensearch[ba81c9d62b8afcd70f7af5d53ba97be7][generic][T#4]" #90 daemon prio=5 os_prio=0 cpu=384.60ms elapsed=750833.31s tid=0x0000fffee4012d10 nid=0x51b6 waiting on condition  [0x0000fffe9edff000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <merged>(a org.opensearch.common.util.concurrent.BaseFuture$Sync)
        at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:211)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire([email protected]/AbstractQueuedSynchronizer.java:715)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1047)
        at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:272)
        at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:104)
        at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:74)
        at org.opensearch.indices.recovery.RecoverySourceHandler.runWithGenericThreadPool(RecoverySourceHandler.java:300)
        at org.opensearch.indices.recovery.RecoverySourceHandler.lambda$acquireStore$10(RecoverySourceHandler.java:275)
        at org.opensearch.indices.recovery.RecoverySourceHandler$$Lambda$8114/0x00000008020a0ab0.close(Unknown Source)
        at org.opensearch.common.lease.Releasables.lambda$releaseOnce$2(Releasables.java:132)
        at org.opensearch.common.lease.Releasables$$Lambda$7884/0x000000080206c038.close(Unknown Source)
        at org.opensearch.common.util.io.IOUtils.close(IOUtils.java:89)
        at org.opensearch.common.util.io.IOUtils.close(IOUtils.java:131)
        at org.opensearch.common.util.io.IOUtils.close(IOUtils.java:81)
        at org.opensearch.indices.recovery.RecoverySourceHandler.lambda$onSendFileStepComplete$8(RecoverySourceHandler.java:243)
        at org.opensearch.indices.recovery.RecoverySourceHandler$$Lambda$8115/0x00000008020a0f20.accept(Unknown Source)
        at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
        at org.opensearch.common.util.concurrent.ListenableFuture$1.doRun(ListenableFuture.java:126)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.common.util.concurrent.ListenableFuture.notifyListener(ListenableFuture.java:120)
        at org.opensearch.common.util.concurrent.ListenableFuture.lambda$done$0(ListenableFuture.java:112)
        at org.opensearch.common.util.concurrent.ListenableFuture$$Lambda$6746/0x0000000801cd9d38.accept(Unknown Source)
        at java.util.ArrayList.forEach([email protected]/ArrayList.java:1511)
        at org.opensearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:112)
        - locked <merged>(a org.opensearch.common.util.concurrent.ListenableFuture)
        at org.opensearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:160)
        at org.opensearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:141)
        at org.opensearch.action.StepListener.innerOnResponse(StepListener.java:79)
        at org.opensearch.core.action.NotifyOnceListener.onResponse(NotifyOnceListener.java:58)
        at org.opensearch.indices.recovery.RecoverySourceHandler.lambda$phase1$16(RecoverySourceHandler.java:491)
        at org.opensearch.indices.recovery.RecoverySourceHandler$$Lambda$8136/0x00000008020a4480.accept(Unknown Source)
        at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
        at org.opensearch.common.util.concurrent.ListenableFuture$1.doRun(ListenableFuture.java:126)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.common.util.concurrent.ListenableFuture.notifyListener(ListenableFuture.java:120)
        at org.opensearch.common.util.concurrent.ListenableFuture.lambda$done$0(ListenableFuture.java:112)
        at org.opensearch.common.util.concurrent.ListenableFuture$$Lambda$6746/0x0000000801cd9d38.accept(Unknown Source)
        at java.util.ArrayList.forEach([email protected]/ArrayList.java:1511)
        at org.opensearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:112)
        - locked <merged>(a org.opensearch.common.util.concurrent.ListenableFuture)
        at org.opensearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:160)
        at org.opensearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:141)
        at org.opensearch.action.StepListener.innerOnResponse(StepListener.java:79)
        at org.opensearch.core.action.NotifyOnceListener.onResponse(NotifyOnceListener.java:58)
        at org.opensearch.core.action.ActionListener$2.onResponse(ActionListener.java:108)
        at org.opensearch.core.action.ActionListener$4.onResponse(ActionListener.java:182)
        at org.opensearch.core.action.ActionListener$6.onResponse(ActionListener.java:301)
        at org.opensearch.action.support.RetryableAction$RetryingListener.onResponse(RetryableAction.java:183)
        at org.opensearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:70)
        at org.opensearch.security.transport.SecurityInterceptor$RestoringTransportResponseHandler.handleResponse(SecurityInterceptor.java:424)
        at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1505)
        at org.opensearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:420)
        at org.opensearch.transport.InboundHandler.lambda$handleResponse$3(InboundHandler.java:414)
        at org.opensearch.transport.InboundHandler$$Lambda$6734/0x0000000801cda6c8.run(Unknown Source)
        at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:863)
        at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run([email protected]/Thread.java:840)

Recovery process submitting task from generic thread to generic threadpool

private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.run(future, task));
FutureUtils.get(future);
}

Generic threadpoool

final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));

Related component

Cluster Manager

To Reproduce

We can follow below steps to reproduce this -

  1. Create a cluster in a such a way that that a node has around 500-1000 primary shards. We can do this on an instance with less than 32 vCPUs so that the generic threadpool size is at the lowest bound.
  2. Increase max shard per node setting to 2000-3000 value.
  3. cluster.routing.allocation.node_concurrent_recoveries can be increased to a very high number like 1000.
  4. Increase replica count of existing indexes in a such a way that atleast one node has more than 128 recoveries ongoing.

Expected behavior

The deadlock should not happen.

Additional Details

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Cluster ManagerIndexing:ReplicationIssues and PRs related to core replication framework eg segrepbugSomething isn't working

    Type

    No type

    Projects

    Status

    🆕 New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions