Skip to content

Commit 09ad86f

Browse files
committed
Rename methods, cleanup
1 parent 5803013 commit 09ad86f

4 files changed

Lines changed: 26 additions & 23 deletions

File tree

cmd/epp/runner/runner.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,9 +450,10 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf
450450
// Add requestControl plugins
451451
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)
452452

453-
// Sort prepare data plugins in DAG order (topological sort). Also check prepare data plugins for cycles.
454-
if r.requestControlConfig.PrepareDataPluginGraph() != nil {
455-
return nil, errors.New("failed to load the configuration - prepare data plugins have cyclic dependencies")
453+
// Sort data plugins in DAG order (topological sort). Also check DAG for cycles.
454+
// Also Order PrepareData plugins based on data dependencies.
455+
if err := r.requestControlConfig.ValidateAndOrderDataDependencies(); err != nil {
456+
return nil, fmt.Errorf("failed to load the configuration - %w", err)
456457
}
457458
// TODO(#1970): Remove feature gate check once prepare data plugins are stable.
458459
if !r.featureGates[datalayer.PrepareDataPluginsFeatureGate] {

pkg/epp/requestcontrol/dag.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func buildDAG(producers map[string]plugin.ProducerPlugin, consumers map[string]p
6464

6565
// sortPlugins builds the dependency graph and returns the plugins ordered in topological order.
6666
// If there is a cycle, it returns an error.
67-
func sortPlugins(dag map[string][]string, plugins []fwk.PrepareDataPlugin) ([]fwk.PrepareDataPlugin, error) {
67+
func sortPlugins(dag map[string][]string, plugins []fwk.PrepareDataPlugin) ([]string, error) {
6868
nameToPlugin := map[string]fwk.PrepareDataPlugin{}
6969
for _, plugin := range plugins {
7070
nameToPlugin[plugin.TypedName().String()] = plugin
@@ -73,12 +73,7 @@ func sortPlugins(dag map[string][]string, plugins []fwk.PrepareDataPlugin) ([]fw
7373
if err != nil {
7474
return nil, err
7575
}
76-
orderedPlugins := []fwk.PrepareDataPlugin{}
77-
for _, pluginName := range sortedPlugins {
78-
orderedPlugins = append(orderedPlugins, nameToPlugin[pluginName])
79-
}
80-
81-
return orderedPlugins, err
76+
return sortedPlugins, nil
8277
}
8378

8479
// TopologicalSort performs Kahn's Algorithm on a DAG.

pkg/epp/requestcontrol/dag_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,20 +162,16 @@ func TestPrepareDataGraph(t *testing.T) {
162162
t.Errorf("prepareDataGraph() mismatch (-want +got):\n%s", diff)
163163
}
164164

165-
orderedPluginNames := make([]string, len(orderedPlugins))
166-
for i, p := range orderedPlugins {
167-
orderedPluginNames[i] = p.TypedName().String()
168-
}
169165
assertTopologicalOrder(t, dag, orderedPlugins)
170166
})
171167
}
172168
}
173169

174-
func assertTopologicalOrder(t *testing.T, dag map[string][]string, ordered []fwk.PrepareDataPlugin) {
170+
func assertTopologicalOrder(t *testing.T, dag map[string][]string, ordered []string) {
175171
t.Helper()
176172
positions := make(map[string]int)
177173
for i, p := range ordered {
178-
positions[p.TypedName().String()] = i
174+
positions[p] = i
179175
}
180176

181177
for node, dependencies := range dag {

pkg/epp/requestcontrol/request_control_config.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,10 @@ func (c *Config) ProducerConsumerPlugins() (map[string]plugin.ProducerPlugin, ma
144144
return producers, consumers
145145
}
146146

147-
// PrepareDataPluginGraph creates data dependency graph and sorts the plugins in topological order.
147+
// ValidateDataDependencies creates a data dependency graph and sorts the plugins in topological order.
148148
// If a cycle is detected, it returns an error.
149-
func (c *Config) PrepareDataPluginGraph() error {
149+
func (c *Config) ValidateAndOrderDataDependencies() error {
150150
producers, consumers := c.ProducerConsumerPlugins()
151-
// TODO(#1988): Add all producer and consumer plugins to the graph.
152-
if len(c.prepareDataPlugins) == 0 {
153-
return nil
154-
}
155151
dag, err := buildDAG(producers, consumers)
156152
if err != nil {
157153
return err
@@ -160,7 +156,22 @@ func (c *Config) PrepareDataPluginGraph() error {
160156
if err != nil {
161157
return err
162158
}
163-
c.prepareDataPlugins = plugins
159+
c.orderPrepareDataPlugins(plugins)
164160

165161
return nil
166162
}
163+
164+
// orderPrepareDataPlugins reorders the prepareDataPlugins in the Config based on the given sorted plugin names.
165+
func (c *Config) orderPrepareDataPlugins(sortedPluginNames []string) {
166+
sortedPlugins := make([]fwk.PrepareDataPlugin, 0, len(sortedPluginNames))
167+
nameToPlugin := make(map[string]fwk.PrepareDataPlugin)
168+
for _, plugin := range NewConfig().prepareDataPlugins {
169+
nameToPlugin[plugin.TypedName().String()] = plugin
170+
}
171+
for _, name := range sortedPluginNames {
172+
if plugin, ok := nameToPlugin[name]; ok {
173+
sortedPlugins = append(sortedPlugins, plugin)
174+
}
175+
}
176+
c.prepareDataPlugins = sortedPlugins
177+
}

0 commit comments

Comments
 (0)