Skip to content

Commit 6dbf778

Browse files
authored
Don't return an error on already present (#1623)
1 parent dbe8a9c commit 6dbf778

File tree

8 files changed

+590
-75
lines changed

8 files changed

+590
-75
lines changed

packages/api/internal/metrics/team.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,13 @@ func (so *TeamObserver) Start(store *sandbox.Store) (err error) {
105105
return nil
106106
}
107107

108-
func (so *TeamObserver) Add(ctx context.Context, teamID uuid.UUID, created bool) {
109-
teamIDStr := teamID.String()
108+
func (so *TeamObserver) Add(ctx context.Context, teamID uuid.UUID) {
110109
// Count started only if the sandbox was created
111-
if created {
112-
attributes := []attribute.KeyValue{
113-
attribute.String("team_id", teamIDStr),
114-
}
115-
116-
so.teamSandboxesCreated.Add(ctx, 1, metric.WithAttributes(attributes...))
110+
attributes := []attribute.KeyValue{
111+
attribute.String("team_id", teamID.String()),
117112
}
113+
114+
so.teamSandboxesCreated.Add(ctx, 1, metric.WithAttributes(attributes...))
118115
}
119116

120117
func (so *TeamObserver) Close(ctx context.Context) error {

packages/api/internal/orchestrator/analytics.go

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/posthog/posthog-go"
8-
"go.opentelemetry.io/otel/attribute"
98
"go.opentelemetry.io/otel/metric"
109
"go.uber.org/zap"
1110
"google.golang.org/protobuf/types/known/timestamppb"
@@ -113,45 +112,41 @@ func (o *Orchestrator) analyticsRemove(ctx context.Context, sandbox sandbox.Sand
113112
}
114113
}
115114

116-
func (o *Orchestrator) analyticsInsert(ctx context.Context, sandbox sandbox.Sandbox, created bool) {
115+
func (o *Orchestrator) analyticsInsert(ctx context.Context, sandbox sandbox.Sandbox) {
117116
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
118117
defer cancel()
119118

120-
if created {
121-
// Run in separate goroutine to not block sandbox creation
122-
_, err := o.analytics.InstanceStarted(ctx, &analyticscollector.InstanceStartedEvent{
123-
InstanceId: sandbox.SandboxID,
124-
ExecutionId: sandbox.ExecutionID,
125-
EnvironmentId: sandbox.TemplateID,
126-
BuildId: sandbox.BuildID.String(),
127-
TeamId: sandbox.TeamID.String(),
128-
CpuCount: sandbox.VCpu,
129-
RamMb: sandbox.RamMB,
130-
DiskSizeMb: sandbox.TotalDiskSizeMB,
131-
Timestamp: timestamppb.Now(),
132-
})
133-
if err != nil {
134-
logger.L().Error(ctx, "Error sending Analytics event", zap.Error(err))
135-
}
119+
_, err := o.analytics.InstanceStarted(ctx, &analyticscollector.InstanceStartedEvent{
120+
InstanceId: sandbox.SandboxID,
121+
ExecutionId: sandbox.ExecutionID,
122+
EnvironmentId: sandbox.TemplateID,
123+
BuildId: sandbox.BuildID.String(),
124+
TeamId: sandbox.TeamID.String(),
125+
CpuCount: sandbox.VCpu,
126+
RamMb: sandbox.RamMB,
127+
DiskSizeMb: sandbox.TotalDiskSizeMB,
128+
Timestamp: timestamppb.Now(),
129+
})
130+
if err != nil {
131+
logger.L().Error(ctx, "Error sending Analytics event", zap.Error(err))
136132
}
137133
}
138134

139-
func (o *Orchestrator) countersInsert(ctx context.Context, sandbox sandbox.Sandbox, newlyCreated bool) {
140-
attributes := []attribute.KeyValue{
141-
telemetry.WithTeamID(sandbox.TeamID.String()),
142-
}
135+
func (o *Orchestrator) handleNewlyCreatedSandbox(ctx context.Context, sandbox sandbox.Sandbox) {
136+
// Send analytics event
137+
o.analyticsInsert(ctx, sandbox)
143138

144-
if newlyCreated {
145-
o.createdCounter.Add(ctx, 1, metric.WithAttributes(attributes...))
146-
}
139+
// Update team metrics
140+
o.teamMetricsObserver.Add(ctx, sandbox.TeamID)
147141

148-
o.sandboxCounter.Add(ctx, 1, metric.WithAttributes(attributes...))
142+
// Increment created counter
143+
o.createdCounter.Add(ctx, 1, metric.WithAttributes(telemetry.WithTeamID(sandbox.TeamID.String())))
149144
}
150145

151-
func (o *Orchestrator) countersRemove(ctx context.Context, sandbox sandbox.Sandbox, _ sandbox.StateAction) {
152-
attributes := []attribute.KeyValue{
153-
telemetry.WithTeamID(sandbox.TeamID.String()),
154-
}
146+
func (o *Orchestrator) sandboxCounterInsert(ctx context.Context, sandbox sandbox.Sandbox) {
147+
o.sandboxCounter.Add(ctx, 1, metric.WithAttributes(telemetry.WithTeamID(sandbox.TeamID.String())))
148+
}
155149

156-
o.sandboxCounter.Add(ctx, -1, metric.WithAttributes(attributes...))
150+
func (o *Orchestrator) countersRemove(ctx context.Context, sandbox sandbox.Sandbox, _ sandbox.StateAction) {
151+
o.sandboxCounter.Add(ctx, -1, metric.WithAttributes(telemetry.WithTeamID(sandbox.TeamID.String())))
157152
}

packages/api/internal/orchestrator/lifecycle.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,7 @@ import (
1111
e2bcatalog "github.com/e2b-dev/infra/packages/shared/pkg/sandbox-catalog"
1212
)
1313

14-
func (o *Orchestrator) observeTeamSandbox(ctx context.Context, sandbox sandbox.Sandbox, created bool) {
15-
o.teamMetricsObserver.Add(ctx, sandbox.TeamID, created)
16-
}
17-
18-
func (o *Orchestrator) addToNode(ctx context.Context, sandbox sandbox.Sandbox, _ bool) {
14+
func (o *Orchestrator) addSandboxToRoutingTable(ctx context.Context, sandbox sandbox.Sandbox) {
1915
node := o.GetNode(sandbox.ClusterID, sandbox.NodeID)
2016
if node == nil {
2117
logger.L().Error(ctx, "failed to get node", logger.WithNodeID(sandbox.NodeID))

packages/api/internal/orchestrator/orchestrator.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,10 @@ func New(
144144
o.sandboxStore = sandbox.NewStore(
145145
sandboxStorage,
146146
reservationStorage,
147-
[]sandbox.InsertCallback{
148-
o.addToNode,
149-
},
150-
[]sandbox.InsertCallback{
151-
o.observeTeamSandbox,
152-
o.countersInsert,
153-
o.analyticsInsert,
147+
sandbox.Callbacks{
148+
AddSandboxToRoutingTable: o.addSandboxToRoutingTable,
149+
AsyncSandboxCounter: o.sandboxCounterInsert,
150+
AsyncNewlyCreatedSandbox: o.handleNewlyCreatedSandbox,
154151
},
155152
)
156153

packages/api/internal/sandbox/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sandbox
22

33
import (
4+
"errors"
45
"fmt"
56
)
67

@@ -19,3 +20,5 @@ type NotFoundError struct {
1920
func (e *NotFoundError) Error() string {
2021
return fmt.Sprintf("sandbox %s not found", e.SandboxID)
2122
}
23+
24+
var ErrAlreadyExists = errors.New("sandbox already exists")

packages/api/internal/sandbox/storage/memory/operations.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@ import (
1414
)
1515

1616
// Add the sandbox to the cache
17-
func (s *Storage) Add(ctx context.Context, sandbox sandbox.Sandbox) error {
18-
added := s.items.SetIfAbsent(sandbox.SandboxID, newMemorySandbox(sandbox))
17+
func (s *Storage) Add(_ context.Context, sbx sandbox.Sandbox) error {
18+
added := s.items.SetIfAbsent(sbx.SandboxID, newMemorySandbox(sbx))
1919
if !added {
20-
logger.L().Warn(ctx, "Sandbox already exists in cache", logger.WithSandboxID(sandbox.SandboxID))
21-
22-
return fmt.Errorf("sandbox \"%s\" already exists", sandbox.SandboxID)
20+
return sandbox.ErrAlreadyExists
2321
}
2422

2523
return nil

packages/api/internal/sandbox/store.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sandbox
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/google/uuid"
78
"go.uber.org/zap"
@@ -11,7 +12,7 @@ import (
1112
)
1213

1314
type (
14-
InsertCallback func(ctx context.Context, sbx Sandbox, created bool)
15+
InsertCallback func(ctx context.Context, sbx Sandbox)
1516
ItemsOption func(*ItemsFilter)
1617
)
1718

@@ -49,27 +50,31 @@ func WithOnlyExpired(isExpired bool) ItemsOption {
4950
}
5051
}
5152

53+
type Callbacks struct {
54+
// AddSandboxToRoutingTable should be called sync to prevent race conditions where we would know where to route the sandbox
55+
AddSandboxToRoutingTable InsertCallback
56+
// AsyncSandboxCounter should be called async to prevent blocking the main goroutine
57+
AsyncSandboxCounter InsertCallback
58+
// AsyncNewlyCreatedSandbox should be called async to prevent blocking the main goroutine
59+
AsyncNewlyCreatedSandbox InsertCallback
60+
}
61+
5262
type Store struct {
53-
storage Storage
54-
insertCallbacks []InsertCallback
55-
insertAsyncCallbacks []InsertCallback
63+
storage Storage
64+
callbacks Callbacks
5665

5766
reservations ReservationStorage
5867
}
5968

6069
func NewStore(
6170
backend Storage,
6271
reservations ReservationStorage,
63-
64-
insertCallbacks []InsertCallback,
65-
insertAsyncCallbacks []InsertCallback,
72+
callbacks Callbacks,
6673
) *Store {
6774
return &Store{
6875
storage: backend,
6976
reservations: reservations,
70-
71-
insertCallbacks: insertCallbacks,
72-
insertAsyncCallbacks: insertAsyncCallbacks,
77+
callbacks: callbacks,
7378
}
7479
}
7580

@@ -87,8 +92,18 @@ func (s *Store) Add(ctx context.Context, sandbox Sandbox, newlyCreated bool) err
8792
}
8893

8994
err := s.storage.Add(ctx, sandbox)
90-
if err != nil {
91-
return err
95+
if err == nil {
96+
// Count only newly added sandboxes to the store
97+
s.callbacks.AddSandboxToRoutingTable(ctx, sandbox)
98+
go s.callbacks.AsyncSandboxCounter(context.WithoutCancel(ctx), sandbox)
99+
} else {
100+
// There's a race condition when the sandbox is added from node sync
101+
// This should be fixed once the sync is improved
102+
if !errors.Is(err, ErrAlreadyExists) {
103+
return err
104+
}
105+
106+
logger.L().Warn(ctx, "Sandbox already exists in cache", logger.WithSandboxID(sandbox.SandboxID))
92107
}
93108

94109
// Ensure the team reservation is set - no limit
@@ -101,13 +116,8 @@ func (s *Store) Add(ctx context.Context, sandbox Sandbox, newlyCreated bool) err
101116
finishStart(sandbox, nil)
102117
}
103118

104-
// Run callbacks
105-
for _, callback := range s.insertCallbacks {
106-
callback(ctx, sandbox, newlyCreated)
107-
}
108-
109-
for _, callback := range s.insertAsyncCallbacks {
110-
go callback(context.WithoutCancel(ctx), sandbox, newlyCreated)
119+
if newlyCreated {
120+
go s.callbacks.AsyncNewlyCreatedSandbox(context.WithoutCancel(ctx), sandbox)
111121
}
112122

113123
return nil

0 commit comments

Comments
 (0)