Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/actions/start-services/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ runs:
make -C packages/clickhouse migrate-without-build
shell: bash

- name: Run Redis
run: |
docker run -d --name redis \
-p 6379:6379 \
--health-cmd="redis-cli ping" \
--health-interval=5s \
--health-timeout=2s \
--health-retries=5 \
redis:latest

while [ "$(docker inspect -f '{{.State.Health.Status}}' redis 2>/dev/null)" != "healthy" ]; do echo "Waiting for Redis to be healthy..."; sleep 2; done
echo "Redis is healthy!"

echo "REDIS_URL=localhost:6379" >> .env.test
shell: bash

- name: Start Services
env:
ENVD_TIMEOUT: "60s"
Expand Down
1 change: 1 addition & 0 deletions packages/api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tool (

require (
github.com/Masterminds/semver/v3 v3.2.1
github.com/bsm/redislock v0.9.4
github.com/caarlos0/env/v11 v11.3.1
github.com/e2b-dev/infra/packages/clickhouse v0.0.0
github.com/e2b-dev/infra/packages/db v0.0.0
Expand Down
2 changes: 2 additions & 0 deletions packages/api/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/api/internal/handlers/sandbox_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (a *APIStore) PostSandboxesSandboxIDConnect(c *gin.Context, sandboxID api.S
}

sandboxID = utils.ShortID(sandboxID)
sandboxData, err := a.orchestrator.GetSandbox(sandboxID)
sandboxData, err := a.orchestrator.GetSandbox(ctx, sandboxID)
if err == nil {
switch sandboxData.State {
case sandbox.StatePausing:
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/handlers/sandbox_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (a *APIStore) GetSandboxesSandboxID(c *gin.Context, id string) {
}

// Try to get the running sandbox first
sbx, err := a.orchestrator.GetSandbox(sandboxId)
sbx, err := a.orchestrator.GetSandbox(ctx, sandboxId)
if err == nil {
// Check if sandbox belongs to the team
if sbx.TeamID != team.ID {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/handlers/sandbox_kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (a *APIStore) DeleteSandboxesSandboxID(

killedOrRemoved := false

sbx, err := a.orchestrator.GetSandbox(sandboxID)
sbx, err := a.orchestrator.GetSandbox(ctx, sandboxID)
if err == nil {
if sbx.TeamID != teamID {
a.sendAPIStoreError(c, http.StatusForbidden, fmt.Sprintf("You don't have access to sandbox \"%s\"", sandboxID))
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/handlers/sandbox_pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a *APIStore) PostSandboxesSandboxIDPause(c *gin.Context, sandboxID api.San
traceID := span.SpanContext().TraceID().String()
c.Set("traceID", traceID)

sbx, err := a.orchestrator.GetSandbox(sandboxID)
sbx, err := a.orchestrator.GetSandbox(ctx, sandboxID)
if err != nil {
apiErr := pauseHandleNotRunningSandbox(ctx, a.sqlcDB, sandboxID, teamID)
a.sendAPIStoreError(c, apiErr.Code, apiErr.ClientMsg)
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/handlers/sandbox_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (a *APIStore) PostSandboxesSandboxIDResume(c *gin.Context, sandboxID api.Sa
}

sandboxID = utils.ShortID(sandboxID)
sandboxData, err := a.orchestrator.GetSandbox(sandboxID)
sandboxData, err := a.orchestrator.GetSandbox(ctx, sandboxID)
if err == nil {
switch sandboxData.State {
case sandbox.StatePausing:
Expand Down
6 changes: 3 additions & 3 deletions packages/api/internal/metrics/team.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type TeamObserver struct {
teamSandboxesCreated metric.Int64Counter
}

func NewTeamObserver(ctx context.Context, sandboxStore sandbox.Store) (*TeamObserver, error) {
func NewTeamObserver(ctx context.Context, sandboxStore *sandbox.Store) (*TeamObserver, error) {
deltaTemporality := otlpmetricgrpc.WithTemporalitySelector(func(sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
})
Expand Down Expand Up @@ -73,11 +73,11 @@ func NewTeamObserver(ctx context.Context, sandboxStore sandbox.Store) (*TeamObse
return observer, nil
}

func (so *TeamObserver) Start(cache sandbox.Store) (err error) {
func (so *TeamObserver) Start(store *sandbox.Store) (err error) {
// Register callbacks for team sandbox metrics
so.registration, err = so.meter.RegisterCallback(
func(_ context.Context, obs metric.Observer) error {
sbxs := cache.Items(nil, []sandbox.State{sandbox.StateRunning})
sbxs := store.Items(nil, []sandbox.State{sandbox.StateRunning})
sbxsPerTeam := make(map[string]int64)
for _, sbx := range sbxs {
teamID := sbx.TeamID.String()
Expand Down
25 changes: 12 additions & 13 deletions packages/api/internal/orchestrator/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
"github.com/e2b-dev/infra/packages/api/internal/sandbox"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/store/memory"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

Expand All @@ -20,15 +19,15 @@ var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/api/internal/orchest
// cacheSyncTime is the time to sync the cache with the actual instances in Orchestrator.
const cacheSyncTime = 20 * time.Second

func (o *Orchestrator) GetSandbox(sandboxID string) (sandbox.Sandbox, error) {
return o.sandboxStore.Get(sandboxID)
func (o *Orchestrator) GetSandbox(ctx context.Context, sandboxID string) (sandbox.Sandbox, error) {
return o.sandboxStore.Get(ctx, sandboxID)
}

// keepInSync the cache with the actual instances in Orchestrator to handle instances that died.
func (o *Orchestrator) keepInSync(ctx context.Context, instanceCache *memory.Store, skipSyncingWithNomad bool) {
func (o *Orchestrator) keepInSync(ctx context.Context, store *sandbox.Store, skipSyncingWithNomad bool) {
// Run the first sync immediately
zap.L().Info("Running the initial node sync")
o.syncNodes(ctx, instanceCache, skipSyncingWithNomad)
o.syncNodes(ctx, store, skipSyncingWithNomad)

// Sync the nodes every cacheSyncTime
ticker := time.NewTicker(cacheSyncTime)
Expand All @@ -41,12 +40,12 @@ func (o *Orchestrator) keepInSync(ctx context.Context, instanceCache *memory.Sto

return
case <-ticker.C:
o.syncNodes(ctx, instanceCache, skipSyncingWithNomad)
o.syncNodes(ctx, store, skipSyncingWithNomad)
}
}
}

func (o *Orchestrator) syncNodes(ctx context.Context, instanceCache *memory.Store, skipSyncingWithNomad bool) {
func (o *Orchestrator) syncNodes(ctx context.Context, store *sandbox.Store, skipSyncingWithNomad bool) {
ctxTimeout, cancel := context.WithTimeout(ctx, cacheSyncTime)
defer cancel()

Expand Down Expand Up @@ -98,9 +97,9 @@ func (o *Orchestrator) syncNodes(ctx context.Context, instanceCache *memory.Stor
// because each of them is taken from different source pool
var err error
if n.IsNomadManaged() {
err = o.syncNode(syncNodesSpanCtx, n, nomadNodes, instanceCache)
err = o.syncNode(syncNodesSpanCtx, n, nomadNodes, store)
} else {
err = o.syncClusterNode(syncNodesSpanCtx, n, instanceCache)
err = o.syncClusterNode(syncNodesSpanCtx, n, store)
}
if err != nil {
zap.L().Error("Error syncing node", zap.Error(err))
Expand Down Expand Up @@ -161,7 +160,7 @@ func (o *Orchestrator) syncClusterDiscoveredNodes(ctx context.Context) {
}
}

func (o *Orchestrator) syncClusterNode(ctx context.Context, node *nodemanager.Node, instanceCache *memory.Store) error {
func (o *Orchestrator) syncClusterNode(ctx context.Context, node *nodemanager.Node, store *sandbox.Store) error {
ctx, childSpan := tracer.Start(ctx, "sync-cluster-node")
telemetry.SetAttributes(ctx, telemetry.WithNodeID(node.ID), telemetry.WithClusterID(node.ClusterID))
defer childSpan.End()
Expand All @@ -180,12 +179,12 @@ func (o *Orchestrator) syncClusterNode(ctx context.Context, node *nodemanager.No
}

// Unified call for syncing node state across different node types
node.Sync(ctx, instanceCache)
node.Sync(ctx, store)

return nil
}

func (o *Orchestrator) syncNode(ctx context.Context, node *nodemanager.Node, discovered []nodemanager.NomadServiceDiscovery, instanceCache *memory.Store) error {
func (o *Orchestrator) syncNode(ctx context.Context, node *nodemanager.Node, discovered []nodemanager.NomadServiceDiscovery, store *sandbox.Store) error {
ctx, childSpan := tracer.Start(ctx, "sync-node")
telemetry.SetAttributes(ctx, telemetry.WithNodeID(node.ID))
defer childSpan.End()
Expand All @@ -204,7 +203,7 @@ func (o *Orchestrator) syncNode(ctx context.Context, node *nodemanager.Node, dis
}

// Unified call for syncing node state across different node types
node.Sync(ctx, instanceCache)
node.Sync(ctx, store)

return nil
}
23 changes: 21 additions & 2 deletions packages/api/internal/orchestrator/create_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (o *Orchestrator) CreateSandbox(
totalConcurrentInstances := team.Limits.SandboxConcurrency

// Check if team has reached max instances
finishStart, waitForStart, err := o.sandboxStore.Reserve(team.Team.ID.String(), sandboxID, totalConcurrentInstances)
finishStart, waitForStart, err := o.sandboxStore.Reserve(ctx, team.Team.ID.String(), sandboxID, int(totalConcurrentInstances))
if err != nil {
var limitErr *sandbox.LimitExceededError

Expand Down Expand Up @@ -289,7 +289,26 @@ func (o *Orchestrator) CreateSandbox(
trafficAccessToken,
)

o.sandboxStore.Add(ctx, sbx, true)
err = o.sandboxStore.Add(ctx, sbx, true)
if err != nil {
telemetry.ReportError(ctx, "failed to add sandbox to store", err)

// Clean up the sandbox from the node
// Copy to a new variable to avoid race conditions
sbxToRemove := sbx
go func() {
killErr := o.removeSandboxFromNode(context.WithoutCancel(ctx), sbxToRemove, sandbox.StateActionKill)
if killErr != nil {
zap.L().Error("Error pausing sandbox", zap.Error(killErr), logger.WithSandboxID(sbxToRemove.SandboxID))
}
}()

return sandbox.Sandbox{}, &api.APIError{
Code: http.StatusInternalServerError,
ClientMsg: "Failed to create sandbox",
Err: fmt.Errorf("failed to add sandbox to store: %w", err),
}
}

return sbx, nil
}
2 changes: 1 addition & 1 deletion packages/api/internal/orchestrator/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (o *Orchestrator) RemoveSandbox(ctx context.Context, sbx sandbox.Sandbox, s

defer func() { go o.countersRemove(context.WithoutCancel(ctx), sbx, stateAction) }()
defer func() { go o.analyticsRemove(context.WithoutCancel(ctx), sbx, stateAction) }()
defer o.sandboxStore.Remove(sbx.TeamID.String(), sbx.SandboxID)
defer o.sandboxStore.Remove(ctx, sbx.TeamID.String(), sbx.SandboxID)
err = o.removeSandboxFromNode(ctx, sbx, stateAction)
if err != nil {
zap.L().Error("Error pausing sandbox", zap.Error(err), logger.WithSandboxID(sbx.SandboxID))
Expand Down
4 changes: 2 additions & 2 deletions packages/api/internal/orchestrator/evictor/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

type Evictor struct {
store sandbox.Store
store *sandbox.Store
removeSandbox func(ctx context.Context, sandbox sandbox.Sandbox, stateAction sandbox.StateAction) error
}

func New(
store sandbox.Store,
store *sandbox.Store,
removeSandbox func(ctx context.Context, sandbox sandbox.Sandbox, stateAction sandbox.StateAction) error,
) *Evictor {
return &Evictor{
Expand Down
2 changes: 1 addition & 1 deletion packages/api/internal/orchestrator/keep_alive.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *Orchestrator) KeepAliveFor(ctx context.Context, sandboxID string, durat
}

var sbxNotFoundErr *sandbox.NotFoundError
sbx, err := o.sandboxStore.Update(sandboxID, updateFunc)
sbx, err := o.sandboxStore.Update(ctx, sandboxID, updateFunc)
if err != nil {
switch {
case errors.As(err, &sbxNotFoundErr):
Expand Down
6 changes: 3 additions & 3 deletions packages/api/internal/orchestrator/nodemanager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/store/memory"
"github.com/e2b-dev/infra/packages/api/internal/sandbox"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
)

const syncMaxRetries = 4

func (n *Node) Sync(ctx context.Context, instanceCache *memory.Store) {
func (n *Node) Sync(ctx context.Context, store *sandbox.Store) {
syncRetrySuccess := false

for range syncMaxRetries {
Expand Down Expand Up @@ -50,7 +50,7 @@ func (n *Node) Sync(ctx context.Context, instanceCache *memory.Store) {
continue
}

instanceCache.Sync(ctx, activeInstances, n.ID)
store.Sync(ctx, activeInstances, n.ID)

syncRetrySuccess = true

Expand Down
31 changes: 23 additions & 8 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/placement"
"github.com/e2b-dev/infra/packages/api/internal/sandbox"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/store/memory"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/reservations"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/storage/memory"
"github.com/e2b-dev/infra/packages/api/internal/sandbox/storage/populate_redis"
redisbackend "github.com/e2b-dev/infra/packages/api/internal/sandbox/storage/redis"
sqlcdb "github.com/e2b-dev/infra/packages/db/client"
"github.com/e2b-dev/infra/packages/shared/pkg/db"
"github.com/e2b-dev/infra/packages/shared/pkg/env"
Expand All @@ -37,7 +40,7 @@ var ErrNodeNotFound = errors.New("node not found")
type Orchestrator struct {
httpClient *http.Client
nomadClient *nomadapi.Client
sandboxStore sandbox.Store
sandboxStore *sandbox.Store
nodes *smap.Map[*nodemanager.Node]
leastBusyAlgorithm placement.Algorithm
bestOfKAlgorithm *placement.BestOfK
Expand Down Expand Up @@ -132,7 +135,21 @@ func New(
createdCounter: createdCounter,
}

sandboxStore := memory.NewStore(
var sandboxStorage sandbox.Storage
memoryStorage := memory.NewStorage()

if redisClient != nil {
redisStorage := redisbackend.NewStorage(redisClient)
sandboxStorage = populate_redis.NewStorage(memoryStorage, redisStorage)
} else {
sandboxStorage = memoryStorage
}

reservationStorage := reservations.NewReservationStorage()

o.sandboxStore = sandbox.NewStore(
sandboxStorage,
reservationStorage,
[]sandbox.InsertCallback{
o.addToNode,
},
Expand All @@ -143,13 +160,11 @@ func New(
},
)

o.sandboxStore = sandboxStore

// Evict old sandboxes
sandboxEvictor := evictor.New(sandboxStore, o.RemoveSandbox)
sandboxEvictor := evictor.New(o.sandboxStore, o.RemoveSandbox)
go sandboxEvictor.Start(ctx)

teamMetricsObserver, err := metrics.NewTeamObserver(ctx, sandboxStore)
teamMetricsObserver, err := metrics.NewTeamObserver(ctx, o.sandboxStore)
if err != nil {
zap.L().Error("Failed to create team metrics observer", zap.Error(err))

Expand All @@ -161,7 +176,7 @@ func New(
// For local development and testing, we skip the Nomad sync
// Local cluster is used for single-node setups instead
skipNomadSync := env.IsLocal()
go o.keepInSync(ctx, sandboxStore, skipNomadSync)
go o.keepInSync(ctx, o.sandboxStore, skipNomadSync)

if err := o.setupMetrics(tel.MeterProvider); err != nil {
zap.L().Error("Failed to setup metrics", zap.Error(err))
Expand Down
Loading
Loading