Skip to content

Commit 8bc3d12

Browse files
committed
Introduce per-node ctx with lover timeout
For local and cluster nodes we have now context created for each async run. This will not timeout whole sync pipeline when node connect takes 20 seconds (pipeline ctx timeout). Limit is set for each async execution.
1 parent 6dbf778 commit 8bc3d12

File tree

1 file changed

+15
-5
lines changed
  • packages/api/internal/orchestrator

1 file changed

+15
-5
lines changed

packages/api/internal/orchestrator/cache.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/api/internal/orchest
2020
// cacheSyncTime is the time to sync the cache with the actual instances in Orchestrator.
2121
const cacheSyncTime = 20 * time.Second
2222

23+
const nodeConnectTimeout = 5 * time.Second
24+
2325
func (o *Orchestrator) GetSandbox(ctx context.Context, sandboxID string) (sandbox.Sandbox, error) {
2426
return o.sandboxStore.Get(ctx, sandboxID)
2527
}
@@ -110,17 +112,21 @@ func (o *Orchestrator) syncNodes(ctx context.Context, store *sandbox.Store, skip
110112

111113
func (o *Orchestrator) syncLocalDiscoveredNodes(ctx context.Context, discovered []nodemanager.NomadServiceDiscovery) {
112114
// Connect local nodes that are not in the list, yet
113-
connectLocalSpanCtx, connectLocalSpan := tracer.Start(ctx, "keep-in-sync-connect-local-nodes")
114-
defer connectLocalSpan.End()
115+
ctx, span := tracer.Start(ctx, "keep-in-sync-connect-local-nodes")
116+
defer span.End()
115117

116118
var wg sync.WaitGroup
117119
defer wg.Wait()
118120

121+
// Make sure slow/failed connections don't block the whole sync loop
122+
ctx, cancel := context.WithTimeout(ctx, nodeConnectTimeout)
123+
defer cancel()
124+
119125
for _, n := range discovered {
120126
// If the node is not in the list, connect to it
121127
if o.GetNodeByNomadShortID(n.NomadNodeShortID) == nil {
122128
wg.Go(func() {
123-
err := o.connectToNode(connectLocalSpanCtx, n)
129+
err := o.connectToNode(ctx, n)
124130
if err != nil {
125131
logger.L().Error(ctx, "Error connecting to node", zap.Error(err))
126132
}
@@ -133,8 +139,12 @@ func (o *Orchestrator) syncClusterDiscoveredNodes(ctx context.Context) {
133139
var wg sync.WaitGroup
134140
defer wg.Wait()
135141

136-
_, connectClusteredSpan := tracer.Start(ctx, "keep-in-sync-connect-clustered-nodes")
137-
defer connectClusteredSpan.End()
142+
ctx, span := tracer.Start(ctx, "keep-in-sync-connect-clustered-nodes")
143+
defer span.End()
144+
145+
// Make sure slow/failed connections don't block the whole sync loop
146+
ctx, cancel := context.WithTimeout(ctx, nodeConnectTimeout)
147+
defer cancel()
138148

139149
// Connect clustered nodes that are not in the list, yet
140150
// We need to iterate over all clusters and their nodes

0 commit comments

Comments
 (0)