Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/grpc/actions/canvases/changesets/canvas_patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ func (p *CanvasPatcher) addEdge(change *pb.CanvasChangeset_Change) error {
return fmt.Errorf("target node %s not found", edge.GetTargetId())
}

if err := ValidateSourceNodeOutputChannel(
p.registry,
p.nodes[edge.GetSourceId()],
edge.GetChannel(),
); err != nil {
return err
}

edgeKey := p.edgeKey(edge.GetSourceId(), edge.GetTargetId(), edge.GetChannel())
if _, exists := p.edges[edgeKey]; exists {
return nil
Expand Down
54 changes: 51 additions & 3 deletions pkg/grpc/actions/canvases/changesets/canvas_patcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test__CanvasPatcher(t *testing.T) {
},
},
},
[]models.Edge{{SourceID: "node-a", TargetID: "node-b", Channel: "default"}},
[]models.Edge{{SourceID: "node-a", TargetID: "node-b", Channel: "true"}},
)

steps.whenHandling(&pb.CanvasChangeset{
Expand All @@ -69,7 +69,7 @@ func Test__CanvasPatcher(t *testing.T) {
Edge: &pb.CanvasChangeset_Change_Edge{
SourceId: "node-a",
TargetId: "node-c",
Channel: "default",
Channel: "true",
},
},
{
Expand All @@ -85,7 +85,7 @@ func Test__CanvasPatcher(t *testing.T) {
steps.assertHasNodeBlock("node-c", "noop")
steps.assertHasNoNodeIntegrationID("node-c")
steps.assertNodeCount(2)
steps.assertHasEdge("node-a", "node-c", "default")
steps.assertHasEdge("node-a", "node-c", "true")
steps.assertEdgeCount(1)
steps.assertGraphIsValid()
})
Expand Down Expand Up @@ -205,6 +205,54 @@ func Test__CanvasPatcher(t *testing.T) {
steps.assertNodePosition("node-b", 780, 95)
})

t.Run("rejects edges that reference an undefined source output channel", func(t *testing.T) {
steps := &CanvasPatcherSteps{t: t, registry: r.Registry, orgID: r.Organization.ID}
steps.givenCanvasVersion(
[]models.Node{
{
ID: "http-1",
Name: "HTTP Request",
Type: models.NodeTypeComponent,
Ref: models.NodeRef{
Component: &models.ComponentRef{Name: "http"},
},
Configuration: map[string]any{
"method": "GET",
"url": "https://example.com",
},
},
{
ID: "if-1",
Name: "If",
Type: models.NodeTypeComponent,
Ref: models.NodeRef{
Component: &models.ComponentRef{Name: "if"},
},
Configuration: map[string]any{
"expression": "true",
},
},
},
nil,
)

steps.whenHandling(&pb.CanvasChangeset{
Changes: []*pb.CanvasChangeset_Change{
{
Type: pb.CanvasChangeset_Change_ADD_EDGE,
Edge: &pb.CanvasChangeset_Change_Edge{
SourceId: "http-1",
TargetId: "if-1",
Channel: "default",
},
},
},
}, nil)

steps.assertHasError()
steps.assertErrorContains(`source node http-1 does not have output channel "default"`)
})

t.Run("returns error when change object is misconfigured", func(t *testing.T) {
testCases := []struct {
name string
Expand Down
88 changes: 88 additions & 0 deletions pkg/grpc/actions/canvases/changesets/output_channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package changesets

import (
"errors"
"fmt"
"slices"

"github.com/superplanehq/superplane/pkg/core"
"github.com/superplanehq/superplane/pkg/models"
"github.com/superplanehq/superplane/pkg/registry"
)

var errUnresolvableSourceNodeOutputChannels = errors.New("source node output channels are not resolvable")

func ValidateSourceNodeOutputChannel(
registry *registry.Registry,
sourceNode models.Node,
channel string,
) error {
outputChannels, err := listSourceNodeOutputChannels(registry, sourceNode)
if err != nil {
if errors.Is(err, errUnresolvableSourceNodeOutputChannels) {
return nil
}

return fmt.Errorf("failed to resolve output channels for source node %s: %w", sourceNode.ID, err)
}

if len(outputChannels) == 0 {
return nil
}

if slices.ContainsFunc(outputChannels, func(outputChannel core.OutputChannel) bool {
return outputChannel.Name == channel
}) {
return nil
}

available := make([]string, 0, len(outputChannels))
for _, outputChannel := range outputChannels {
available = append(available, outputChannel.Name)
}

return fmt.Errorf(
"source node %s does not have output channel %q (available: %v)",
sourceNode.ID,
channel,
available,
)
}

func listSourceNodeOutputChannels(
registry *registry.Registry,
sourceNode models.Node,
) ([]core.OutputChannel, error) {
if sourceNode.Type == models.NodeTypeComponent {
return listComponentOutputChannels(registry, sourceNode)
}

if sourceNode.Type == models.NodeTypeTrigger {
return []core.OutputChannel{core.DefaultOutputChannel}, nil
}

if sourceNode.Type == models.NodeTypeBlueprint {
// TODO: Validate blueprint output channels without doing a blueprint lookup per edge.
return nil, nil
}

return nil, fmt.Errorf("node type %s is not supported", sourceNode.Type)
}

func listComponentOutputChannels(registry *registry.Registry, sourceNode models.Node) ([]core.OutputChannel, error) {
if sourceNode.Ref.Component == nil || sourceNode.Ref.Component.Name == "" {
return nil, fmt.Errorf("%w: component reference is required", errUnresolvableSourceNodeOutputChannels)
}

component, err := registry.GetComponent(sourceNode.Ref.Component.Name)
if err != nil {
return nil, fmt.Errorf("%w: %v", errUnresolvableSourceNodeOutputChannels, err)
}

outputChannels := component.OutputChannels(sourceNode.Configuration)
if len(outputChannels) > 0 {
return outputChannels, nil
}

return []core.OutputChannel{core.DefaultOutputChannel}, nil
}
104 changes: 104 additions & 0 deletions pkg/grpc/actions/canvases/changesets/output_channels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package changesets

import (
"net/http"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/superplanehq/superplane/pkg/configuration"
"github.com/superplanehq/superplane/pkg/core"
"github.com/superplanehq/superplane/pkg/crypto"
"github.com/superplanehq/superplane/pkg/models"
"github.com/superplanehq/superplane/pkg/registry"
)

type testOutputChannelComponent struct {
channels []core.OutputChannel
}

func (c *testOutputChannelComponent) Name() string { return "test-component" }

func (c *testOutputChannelComponent) Label() string { return "Test Component" }

func (c *testOutputChannelComponent) Description() string { return "" }

func (c *testOutputChannelComponent) Documentation() string { return "" }

func (c *testOutputChannelComponent) Icon() string { return "" }

func (c *testOutputChannelComponent) Color() string { return "" }

func (c *testOutputChannelComponent) ExampleOutput() map[string]any { return nil }

func (c *testOutputChannelComponent) OutputChannels(any) []core.OutputChannel { return c.channels }

func (c *testOutputChannelComponent) Configuration() []configuration.Field { return nil }

func (c *testOutputChannelComponent) Setup(core.SetupContext) error { return nil }

func (c *testOutputChannelComponent) ProcessQueueItem(core.ProcessQueueContext) (*uuid.UUID, error) {
return nil, nil
}

func (c *testOutputChannelComponent) Execute(core.ExecutionContext) error { return nil }

func (c *testOutputChannelComponent) Actions() []core.Action { return nil }

func (c *testOutputChannelComponent) HandleAction(core.ActionContext) error { return nil }

func (c *testOutputChannelComponent) HandleWebhook(core.WebhookRequestContext) (int, *core.WebhookResponseBody, error) {
return http.StatusOK, nil, nil
}

func (c *testOutputChannelComponent) Cancel(core.ExecutionContext) error { return nil }

func (c *testOutputChannelComponent) Cleanup(core.SetupContext) error { return nil }

func TestValidateSourceNodeOutputChannel(t *testing.T) {
reg, err := registry.NewRegistry(&crypto.NoOpEncryptor{}, registry.HTTPOptions{})
require.NoError(t, err)

t.Run("unresolvable source component stays soft", func(t *testing.T) {
err := ValidateSourceNodeOutputChannel(
reg,
models.Node{
ID: "node-a",
Type: models.NodeTypeComponent,
Ref: models.NodeRef{
Component: &models.ComponentRef{Name: "missing-component"},
},
},
"default",
)

require.NoError(t, err)
})

t.Run("wrong channel on resolvable component returns error", func(t *testing.T) {
reg.Components["test-component"] = &testOutputChannelComponent{
channels: []core.OutputChannel{
{Name: "success"},
{Name: "failure"},
},
}

err := ValidateSourceNodeOutputChannel(
reg,
models.Node{
ID: "node-a",
Type: models.NodeTypeComponent,
Ref: models.NodeRef{
Component: &models.ComponentRef{Name: "test-component"},
},
},
"default",
)

require.Error(t, err)
assert.Contains(t, err.Error(), `source node node-a does not have output channel "default"`)
assert.Contains(t, err.Error(), "success")
assert.Contains(t, err.Error(), "failure")
})
}
51 changes: 51 additions & 0 deletions pkg/grpc/actions/canvases/create_canvas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,57 @@ func TestCreateCanvasOnFreshOrganization(t *testing.T) {
require.Equal(t, r.Organization.ID, persisted.OrganizationID)
}

func TestCreateCanvasRejectsInvalidEdgeChannel(t *testing.T) {
r := support.Setup(t)
ctx := authentication.SetUserIdInMetadata(context.Background(), r.User.String())

canvas := &pb.Canvas{
Metadata: &pb.Canvas_Metadata{
Name: "Invalid HTTP Channel",
},
Spec: &pb.Canvas_Spec{
Nodes: []*componentpb.Node{
{
Id: "http-1",
Name: "HTTP Request",
Type: componentpb.Node_TYPE_COMPONENT,
Component: &componentpb.Node_ComponentRef{
Name: "http",
},
Configuration: structFromAnyMap(t, map[string]any{
"method": "GET",
"url": "https://example.com",
}),
},
{
Id: "if-1",
Name: "If",
Type: componentpb.Node_TYPE_COMPONENT,
Component: &componentpb.Node_ComponentRef{
Name: "if",
},
Configuration: structFromAnyMap(t, map[string]any{
"expression": "true",
}),
},
},
Edges: []*componentpb.Edge{
{
SourceId: "http-1",
TargetId: "if-1",
Channel: "default",
},
},
},
}

baseURL := "https://example.com"
_, err := CreateCanvas(ctx, r.Registry, r.Encryptor, r.AuthService, baseURL, r.Organization.ID, canvas, nil, nil)
require.Error(t, err)
require.Equal(t, codes.InvalidArgument, status.Code(err))
require.Contains(t, status.Convert(err).Message(), `source node http-1 does not have output channel "default"`)
}

func TestCreateCanvasWithUsageRejectsLimitViolation(t *testing.T) {
r := support.Setup(t)
ctx := authentication.SetUserIdInMetadata(context.Background(), r.User.String())
Expand Down
14 changes: 13 additions & 1 deletion pkg/grpc/actions/canvases/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ func ParseCanvas(registry *registry.Registry, orgID string, canvas *pb.Canvas) (

// Find shadowed names within connected components
nodeWarnings := actions.FindShadowedNameWarnings(canvas.Spec.Nodes, canvas.Spec.Edges)
nodes := actions.ProtoToNodes(canvas.Spec.Nodes)
nodesByID := make(map[string]models.Node, len(nodes))
for _, node := range nodes {
nodesByID[node.ID] = node
}

for i, edge := range canvas.Spec.Edges {
if edge.SourceId == "" || edge.TargetId == "" {
Expand All @@ -283,10 +288,17 @@ func ParseCanvas(registry *registry.Registry, orgID string, canvas *pb.Canvas) (
if nodeTypeByID[edge.TargetId] == compb.Node_TYPE_WIDGET {
return nil, nil, status.Errorf(codes.InvalidArgument, "edge %d: widget nodes cannot be used as target nodes", i)
}

if err := changesets.ValidateSourceNodeOutputChannel(
registry,
nodesByID[edge.SourceId],
edge.Channel,
); err != nil {
return nil, nil, status.Errorf(codes.InvalidArgument, "edge %d: %v", i, err)
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

// Convert proto nodes to models, adding validation errors and warnings where applicable
nodes := actions.ProtoToNodes(canvas.Spec.Nodes)
edges := actions.ProtoToEdges(canvas.Spec.Edges)
for i := range nodes {
if errorMsg, hasError := nodeValidationErrors[nodes[i].ID]; hasError {
Expand Down
Loading
Loading