Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/api/internal/orchestrator/nodemanager/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,49 @@ func WithCPUInfo(cpuArch, cpuFamily, cpuModel string) TestOptions {
}
}

// mockSandboxClientWithError implements orchestrator.SandboxServiceClient that returns an error
type mockSandboxClientWithError struct {
orchestrator.SandboxServiceClient

err error
}

// Create is a mock implementation that returns a specific error
func (n *mockSandboxClientWithError) Create(_ context.Context, _ *orchestrator.SandboxCreateRequest, _ ...grpc.CallOption) (*orchestrator.SandboxCreateResponse, error) {
return nil, n.err
}

func WithSandboxCreateError(err error) TestOptions {
return func(node *TestNode) {
node.client.Sandbox = &mockSandboxClientWithError{
err: err,
}
}
}

// MockSandboxClientCustom allows custom error logic per call
type MockSandboxClientCustom struct {
orchestrator.SandboxServiceClient

CreateFunc func() error
}

// Create calls the custom function to determine the error
func (n *MockSandboxClientCustom) Create(_ context.Context, _ *orchestrator.SandboxCreateRequest, _ ...grpc.CallOption) (*orchestrator.SandboxCreateResponse, error) {
if n.CreateFunc != nil {
if err := n.CreateFunc(); err != nil {
return nil, err
}
}

return &orchestrator.SandboxCreateResponse{}, nil
}

// SetSandboxClient allows setting a custom sandbox client on a test node
func (n *TestNode) SetSandboxClient(client orchestrator.SandboxServiceClient) {
n.client.Sandbox = client
}

// NewTestNode creates a properly initialized Node for testing purposes
// It uses a mock gRPC client and has simplified Status() method behavior
func NewTestNode(id string, status api.NodeStatus, cpuAllocated int64, cpuCount uint32, options ...TestOptions) *TestNode {
Expand Down
40 changes: 25 additions & 15 deletions packages/api/internal/orchestrator/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var errSandboxCreateFailed = fmt.Errorf("failed to create a new sandbox, if the
// and current load distribution.
type Algorithm interface {
chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error)
excludeNode(err error) bool
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified the logic a little

}

func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*nodemanager.Node, preferredNode *nodemanager.Node, sbxRequest *orchestrator.SandboxCreateRequest, buildMachineInfo machineinfo.MachineInfo) (*nodemanager.Node, error) {
Expand Down Expand Up @@ -78,24 +77,35 @@ func PlaceSandbox(ctx context.Context, algorithm Algorithm, clusterNodes []*node
err = node.SandboxCreate(ctx, sbxRequest)
span.End()
if err != nil {
if algorithm.excludeNode(err) {
logger.L().Warn(ctx, "Excluding node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(node.ID))
nodesExcluded[node.ID] = struct{}{}
}
failedNode := node
node = nil

st, ok := status.FromError(err)
if !ok || st.Code() != codes.ResourceExhausted {
node.PlacementMetrics.Fail(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Error(ctx, "Failed to create sandbox", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(node.ID), zap.Int("attempt", attempt+1), zap.Error(utils.UnwrapGRPCError(err)))
attempt++
} else {
node.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(node.ID))
if ok {
switch st.Code() {
case codes.ResourceExhausted:
failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Warn(ctx, "Node exhausted, trying another node", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID))
case codes.NotFound:
failedNode.PlacementMetrics.Skip(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Warn(ctx, "Build not found, retrying", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID))

// We tried non-preferred node and the data aren't uploaded yet, try to use the preferred again
// This should prevent spamming the preferred node, yet still try to place the sandbox there as it will be faster
if preferredNode != nil && preferredNode.ID != failedNode.ID {
node = preferredNode
}
default:
nodesExcluded[failedNode.ID] = struct{}{}
failedNode.PlacementMetrics.Fail(sbxRequest.GetSandbox().GetSandboxId())
logger.L().Error(ctx, "Failed to create sandbox", logger.WithSandboxID(sbxRequest.GetSandbox().GetSandboxId()), logger.WithNodeID(failedNode.ID), zap.Int("attempt", attempt+1), zap.Error(utils.UnwrapGRPCError(err)))
attempt++
}

continue
}

node = nil

continue
return nil, fmt.Errorf("unexpected error during sandbox creation on node %s (attempt %d): %w", failedNode.ID, attempt+1, err)
}

node.PlacementMetrics.Success(sbxRequest.GetSandbox().GetSandboxId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"math/rand"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
"github.com/e2b-dev/infra/packages/shared/pkg/machineinfo"
Expand Down Expand Up @@ -105,16 +102,6 @@ func (b *BestOfK) UpdateConfig(config BestOfKConfig) {
b.config = config
}

func (b *BestOfK) excludeNode(err error) bool {
st, ok := status.FromError(err)
// If the node is just exhausted, keep it
if ok && st.Code() == codes.ResourceExhausted {
return false
}

return true
}

// chooseNode selects the best node for placing a VM with the given quota
func (b *BestOfK) chooseNode(_ context.Context, nodes []*nodemanager.Node, excludedNodes map[string]struct{}, resources nodemanager.SandboxResources, buildMachineInfo machineinfo.MachineInfo) (bestNode *nodemanager.Node, err error) {
// Fix the config, we want to dynamically update it
Expand Down
155 changes: 149 additions & 6 deletions packages/api/internal/orchestrator/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/e2b-dev/infra/packages/api/internal/api"
"github.com/e2b-dev/infra/packages/api/internal/orchestrator/nodemanager"
Expand All @@ -21,12 +23,6 @@ type mockAlgorithm struct {
mock.Mock
}

func (m *mockAlgorithm) excludeNode(err error) bool {
args := m.Called(err)

return args.Bool(0)
}

func (m *mockAlgorithm) chooseNode(ctx context.Context, nodes []*nodemanager.Node, nodesExcluded map[string]struct{}, requested nodemanager.SandboxResources, buildCPUInfo machineinfo.MachineInfo) (*nodemanager.Node, error) {
args := m.Called(ctx, nodes, nodesExcluded, requested, buildCPUInfo)
if args.Get(0) == nil {
Expand Down Expand Up @@ -176,3 +172,150 @@ func TestPlaceSandbox_AllNodesExcluded(t *testing.T) {
assert.Contains(t, err.Error(), "no nodes available")
algorithm.AssertExpectations(t)
}

func TestPlaceSandbox_ResourceExhausted(t *testing.T) {
ctx := t.Context()

// Create test nodes - node1 will return ResourceExhausted, node2 will succeed
node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 3, 4,
nodemanager.WithSandboxCreateError(status.Error(codes.ResourceExhausted, "node exhausted")))
node2 := nodemanager.NewTestNode("node2", api.NodeStatusReady, 5, 4)
nodes := []*nodemanager.Node{node1, node2}

// Algorithm should be called twice - first returns node1 (exhausted), then node2 (succeeds)
algorithm := &mockAlgorithm{}
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node1, nil).Once()
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node2, nil).Once()

sbxRequest := &orchestrator.SandboxCreateRequest{
Sandbox: &orchestrator.SandboxConfig{
SandboxId: "test-sandbox",
Vcpu: 2,
RamMb: 1024,
},
}

resultNode, err := PlaceSandbox(ctx, algorithm, nodes, nil, sbxRequest, machineinfo.MachineInfo{})

require.NoError(t, err)
assert.NotNil(t, resultNode)
assert.Equal(t, node2, resultNode, "should succeed on node2 after node1 was exhausted")
algorithm.AssertExpectations(t)

// Verify node1 was NOT excluded (ResourceExhausted nodes should be retried)
algorithm.AssertNumberOfCalls(t, "chooseNode", 2)
}

func TestPlaceSandbox_NotFoundWithPreferredNode(t *testing.T) {
ctx := t.Context()

// Scenario: Preferred node is exhausted, we try another node which returns NotFound,
// then we should retry on the preferred node

// Create a mock client that returns ResourceExhausted first, then succeeds on retry
callCount := 0

preferredNode := nodemanager.NewTestNode("preferred-node", api.NodeStatusReady, 5, 4)
preferredNode.SetSandboxClient(&nodemanager.MockSandboxClientCustom{
CreateFunc: func() error {
callCount++
if callCount == 1 {
return status.Error(codes.ResourceExhausted, "node temporarily exhausted")
}

return nil
},
})

node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 3, 4,
nodemanager.WithSandboxCreateError(status.Error(codes.NotFound, "sandbox files not found")))
nodes := []*nodemanager.Node{preferredNode, node1}

// Algorithm should be called once to select node1 after preferred node is exhausted
algorithm := &mockAlgorithm{}
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node1, nil).Once()

sbxRequest := &orchestrator.SandboxCreateRequest{
Sandbox: &orchestrator.SandboxConfig{
SandboxId: "test-sandbox",
Vcpu: 2,
RamMb: 1024,
},
}

// Start with preferred node (exhausted) -> try node1 (NotFound) -> retry preferred node (succeeds)
resultNode, err := PlaceSandbox(ctx, algorithm, nodes, preferredNode, sbxRequest, machineinfo.MachineInfo{})

require.NoError(t, err)
assert.NotNil(t, resultNode)
assert.Equal(t, preferredNode, resultNode, "should retry on preferred node after NotFound on different node")

// Algorithm should be called once (for node1), then preferred node is retried
algorithm.AssertNumberOfCalls(t, "chooseNode", 1)
assert.Equal(t, 2, callCount, "preferred node should be tried twice")
}

func TestPlaceSandbox_NotFoundWithoutPreferredNode(t *testing.T) {
ctx := t.Context()

// Create test nodes - both return NotFound initially, node2 succeeds on retry
node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 3, 4,
nodemanager.WithSandboxCreateError(status.Error(codes.NotFound, "sandbox files not found")))
node2 := nodemanager.NewTestNode("node2", api.NodeStatusReady, 5, 4)
nodes := []*nodemanager.Node{node1, node2}

algorithm := &mockAlgorithm{}
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node1, nil).Once()
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node2, nil).Once()

sbxRequest := &orchestrator.SandboxCreateRequest{
Sandbox: &orchestrator.SandboxConfig{
SandboxId: "test-sandbox",
Vcpu: 2,
RamMb: 1024,
},
}

// No preferred node - should retry with algorithm
resultNode, err := PlaceSandbox(ctx, algorithm, nodes, nil, sbxRequest, machineinfo.MachineInfo{})

require.NoError(t, err)
assert.NotNil(t, resultNode)
assert.Equal(t, node2, resultNode, "should succeed on node2 after NotFound on node1")
algorithm.AssertNumberOfCalls(t, "chooseNode", 2)
}

func TestPlaceSandbox_NotFoundOnPreferredNode(t *testing.T) {
ctx := t.Context()

// Create test nodes - preferred node returns NotFound, node1 succeeds
preferredNode := nodemanager.NewTestNode("preferred-node", api.NodeStatusReady, 5, 4,
nodemanager.WithSandboxCreateError(status.Error(codes.NotFound, "sandbox files not found")))
node1 := nodemanager.NewTestNode("node1", api.NodeStatusReady, 3, 4)
nodes := []*nodemanager.Node{preferredNode, node1}

algorithm := &mockAlgorithm{}
algorithm.On("chooseNode", mock.Anything, nodes, mock.Anything, mock.Anything, mock.Anything).
Return(node1, nil).Once()

sbxRequest := &orchestrator.SandboxCreateRequest{
Sandbox: &orchestrator.SandboxConfig{
SandboxId: "test-sandbox",
Vcpu: 2,
RamMb: 1024,
},
}

// Start with preferred node that returns NotFound
resultNode, err := PlaceSandbox(ctx, algorithm, nodes, preferredNode, sbxRequest, machineinfo.MachineInfo{})

require.NoError(t, err)
assert.NotNil(t, resultNode)
assert.Equal(t, node1, resultNode, "should succeed on node1 after NotFound on preferred node")
algorithm.AssertNumberOfCalls(t, "chooseNode", 1)
}
4 changes: 4 additions & 0 deletions packages/orchestrator/internal/server/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func (s *Server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
req.GetSandbox(),
)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, status.Errorf(codes.NotFound, "sandbox files for '%s' not found", req.GetSandbox().GetSandboxId())
}

err := errors.Join(err, context.Cause(ctx))
telemetry.ReportCriticalError(ctx, "failed to create sandbox", err)

Expand Down
Loading