Skip to content

Commit 7228b3c

Browse files
committed
fix(assets): properly set inputs & outputs
1 parent 93ff8a4 commit 7228b3c

3 files changed

Lines changed: 174 additions & 104 deletions

File tree

src/main/java/io/kestra/plugin/dbt/ResultParser.java

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
66
import io.kestra.core.models.assets.AssetIdentifier;
77
import io.kestra.core.models.assets.AssetsInOut;
8+
import io.kestra.core.models.assets.Asset;
89
import io.kestra.core.models.assets.Custom;
910
import io.kestra.core.models.executions.TaskRun;
1011
import io.kestra.core.models.executions.TaskRunAttempt;
@@ -150,16 +151,9 @@ private static AssetsInOut assetsFor(String uniqueId, Map<String, ModelAsset> mo
150151
}
151152

152153
List<AssetIdentifier> inputs = inputIdentifiers(modelAsset, modelAssets);
154+
List<Asset> outputs = outputAssets(modelAsset, modelAssets);
153155

154-
return new AssetsInOut(
155-
inputs,
156-
List.of(Custom.builder()
157-
.id(modelAsset.assetId())
158-
.type(TABLE_ASSET_TYPE)
159-
.metadata(modelAsset.metadata())
160-
.build()
161-
)
162-
);
156+
return new AssetsInOut(inputs, outputs);
163157
}
164158

165159
private static void emitAssets(RunContext runContext, Manifest manifest) throws IllegalVariableEvaluationException {
@@ -168,24 +162,32 @@ private static void emitAssets(RunContext runContext, Manifest manifest) throws
168162

169163
for (ModelAsset asset : modelAssets.values()) {
170164
List<AssetIdentifier> inputs = inputIdentifiers(asset, modelAssets);
165+
List<Asset> outputs = outputAssets(asset, modelAssets);
171166
try {
172-
runContext.assets().emit(new AssetEmit(
173-
inputs,
174-
List.of(Custom.builder()
175-
.id(asset.assetId())
176-
.type(TABLE_ASSET_TYPE)
177-
.metadata(asset.metadata())
178-
.build()
179-
)
180-
)
181-
);
167+
runContext.assets().emit(new AssetEmit(inputs, outputs));
182168
} catch (UnsupportedOperationException | QueueException e) {
183169
// UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
184170
runContext.logger().warn("Unable to emit dbt asset '{}'", asset.assetId(), e);
185171
}
186172
}
187173
}
188174

175+
private static List<Asset> outputAssets(ModelAsset modelAsset, Map<String, ModelAsset> modelAssets) {
176+
if (modelAsset.children() == null || modelAsset.children().isEmpty()) {
177+
return List.of();
178+
}
179+
180+
return modelAsset.children().stream()
181+
.map(modelAssets::get)
182+
.filter(Objects::nonNull)
183+
.<Asset>map(child -> Custom.builder()
184+
.id(child.assetId())
185+
.type(TABLE_ASSET_TYPE)
186+
.metadata(child.metadata())
187+
.build())
188+
.toList();
189+
}
190+
189191
private static List<AssetIdentifier> inputIdentifiers(ModelAsset modelAsset, Map<String, ModelAsset> modelAssets) {
190192
if (modelAsset.dependsOn() == null || modelAsset.dependsOn().isEmpty()) {
191193
return List.of();
@@ -237,7 +239,7 @@ private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
237239
dependsOn = List.of();
238240
}
239241

240-
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn));
242+
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn, List.of()));
241243
}
242244

243245
Map<String, ModelAsset> filtered = new HashMap<>(modelAssets.size());
@@ -246,10 +248,26 @@ private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
246248
List<String> deps = a.dependsOn() == null ? List.of() : a.dependsOn().stream()
247249
.filter(modelAssets::containsKey)
248250
.toList();
249-
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps));
251+
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps, List.of()));
252+
}
253+
254+
// Compute reverse dependencies (children: models that depend on this one)
255+
Map<String, List<String>> childrenMap = new HashMap<>();
256+
for (Map.Entry<String, ModelAsset> e : filtered.entrySet()) {
257+
for (String dep : e.getValue().dependsOn()) {
258+
childrenMap.computeIfAbsent(dep, k -> new ArrayList<>()).add(e.getKey());
259+
}
260+
}
261+
262+
// Rebuild with children populated
263+
Map<String, ModelAsset> result = new HashMap<>(filtered.size());
264+
for (Map.Entry<String, ModelAsset> e : filtered.entrySet()) {
265+
ModelAsset a = e.getValue();
266+
List<String> children = childrenMap.getOrDefault(e.getKey(), List.of());
267+
result.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), a.dependsOn(), children));
250268
}
251269

252-
return filtered;
270+
return result;
253271
}
254272

255273
private static String adapterType(Manifest manifest) {
@@ -293,6 +311,6 @@ private static boolean hasValue(String value) {
293311
return value != null && !value.trim().isEmpty();
294312
}
295313

296-
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn) {
314+
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn, List<String> children) {
297315
}
298316
}

src/test/java/io/kestra/plugin/dbt/ResultParserTest.java

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
import jakarta.inject.Inject;
1111
import org.junit.jupiter.api.Test;
1212

13+
import io.kestra.core.runners.AssetEmit;
14+
1315
import java.nio.file.Files;
14-
import java.util.HashMap;
1516
import java.util.List;
1617
import java.util.Map;
1718

18-
import static java.util.stream.Collectors.toMap;
1919
import static org.hamcrest.MatcherAssert.assertThat;
2020
import static org.hamcrest.Matchers.*;
2121

@@ -71,22 +71,23 @@ void parseManifestWithAssets_shouldEmitModelAssets() throws Exception {
7171
assertThat(manifestResult.manifest(), is(notNullValue()));
7272
assertThat(runContext.assets().emitted(), hasSize(2));
7373

74-
var outputAssets = runContext.assets().emitted().stream()
75-
.flatMap(assetEmit -> assetEmit.outputs().stream())
76-
.toList();
77-
78-
assertThat(outputAssets, hasSize(2));
79-
var byId = new HashMap<String, io.kestra.core.models.assets.Asset>();
80-
outputAssets.forEach(asset -> byId.put(asset.getId(), asset));
74+
// stg_orders has no inputs and 1 output (fct_orders, its child)
75+
// fct_orders has 1 input (stg_orders) and no outputs (leaf node)
76+
var stgOrdersEmit = findEmitWithOutput(runContext.assets().emitted(), "analytics.marts.fct_orders");
77+
assertThat("stg_orders emission should exist", stgOrdersEmit, is(notNullValue()));
78+
assertThat(stgOrdersEmit.inputs(), hasSize(0));
79+
assertThat(stgOrdersEmit.outputs(), hasSize(1));
8180

82-
assertThat(byId.containsKey("analytics.staging.stg_orders"), is(true));
83-
assertThat(byId.containsKey("analytics.marts.fct_orders"), is(true));
81+
var fctOrdersOutput = stgOrdersEmit.outputs().getFirst();
82+
assertThat(fctOrdersOutput.getMetadata().get("system"), is("postgres"));
83+
assertThat(fctOrdersOutput.getMetadata().get("database"), is("analytics"));
84+
assertThat(fctOrdersOutput.getMetadata().get("schema"), is("marts"));
85+
assertThat(fctOrdersOutput.getMetadata().get("name"), is("fct_orders"));
8486

85-
var stgOrders = byId.get("analytics.staging.stg_orders");
86-
assertThat(stgOrders.getMetadata().get("system"), is("postgres"));
87-
assertThat(stgOrders.getMetadata().get("database"), is("analytics"));
88-
assertThat(stgOrders.getMetadata().get("schema"), is("staging"));
89-
assertThat(stgOrders.getMetadata().get("name"), is("stg_orders"));
87+
var fctOrdersEmit = findEmitWithInput(runContext.assets().emitted(), "analytics.staging.stg_orders");
88+
assertThat("fct_orders emission should exist", fctOrdersEmit, is(notNullValue()));
89+
assertThat(fctOrdersEmit.inputs(), hasSize(1));
90+
assertThat(fctOrdersEmit.outputs(), hasSize(0));
9091
}
9192

9293
@Test
@@ -133,18 +134,20 @@ void parseManifestWithAssets_shouldEmitLineageInputs() throws Exception {
133134

134135
ResultParser.parseManifestWithAssets(runContext, manifestFile.toFile());
135136

136-
var emittedByOutputId = runContext.assets().emitted().stream()
137-
.collect(toMap(
138-
assetEmit -> assetEmit.outputs().getFirst().getId(),
139-
assetEmit -> assetEmit
140-
));
137+
assertThat(runContext.assets().emitted(), hasSize(2));
141138

142-
assertThat(emittedByOutputId, hasKey("analytics.marts.my_first_dbt_model"));
143-
assertThat(emittedByOutputId, hasKey("analytics.marts.my_second_dbt_model"));
139+
// my_first_dbt_model: no inputs, 1 output (my_second_dbt_model)
140+
var firstModelEmit = findEmitWithOutput(runContext.assets().emitted(), "analytics.marts.my_second_dbt_model");
141+
assertThat(firstModelEmit, is(notNullValue()));
142+
assertThat(firstModelEmit.inputs(), hasSize(0));
143+
assertThat(firstModelEmit.outputs(), hasSize(1));
144144

145-
var secondModelEmit = emittedByOutputId.get("analytics.marts.my_second_dbt_model");
145+
// my_second_dbt_model: 1 input (my_first_dbt_model), no outputs (leaf)
146+
var secondModelEmit = findEmitWithInput(runContext.assets().emitted(), "analytics.marts.my_first_dbt_model");
147+
assertThat(secondModelEmit, is(notNullValue()));
146148
assertThat(secondModelEmit.inputs(), hasSize(1));
147149
assertThat(secondModelEmit.inputs().getFirst().id(), is("analytics.marts.my_first_dbt_model"));
150+
assertThat(secondModelEmit.outputs(), hasSize(0));
148151
}
149152

150153
@Test
@@ -200,26 +203,55 @@ void parseManifestWithAssets_shouldUseParentMapForLineage() throws Exception {
200203

201204
ResultParser.parseManifestWithAssets(runContext, manifestFile.toFile());
202205

203-
var emittedByOutputId = runContext.assets().emitted().stream()
204-
.collect(toMap(
205-
assetEmit -> assetEmit.outputs().getFirst().getId(),
206-
assetEmit -> assetEmit
207-
));
206+
assertThat(runContext.assets().emitted(), hasSize(3));
208207

209-
// fct_orders should only depend on int_orders (from parent_map),
210-
// NOT on both stg_orders and int_orders (from depends_on.nodes)
211-
var fctOrdersEmit = emittedByOutputId.get("dev.marts.fct_orders");
212-
assertThat(fctOrdersEmit.inputs(), hasSize(1));
213-
assertThat(fctOrdersEmit.inputs().getFirst().id(), is("dev.intermediate.int_orders"));
208+
// DAG: stg_orders → int_orders → fct_orders (parent_map, no transitive edges)
209+
// Inputs = upstream deps, Outputs = downstream children
210+
211+
// stg_orders: no model inputs (source filtered out), 1 output (int_orders)
212+
var stgOrdersEmit = findEmitWithOutput(runContext.assets().emitted(), "dev.intermediate.int_orders");
213+
assertThat(stgOrdersEmit, is(notNullValue()));
214+
assertThat(stgOrdersEmit.inputs(), hasSize(0));
215+
assertThat(stgOrdersEmit.outputs(), hasSize(1));
214216

215-
// int_orders should depend on stg_orders
216-
var intOrdersEmit = emittedByOutputId.get("dev.intermediate.int_orders");
217+
// int_orders: 1 input (stg_orders), 1 output (fct_orders)
218+
var intOrdersEmit = findEmitWithInputAndOutput(runContext.assets().emitted(),
219+
"dev.staging.stg_orders", "dev.marts.fct_orders");
220+
assertThat(intOrdersEmit, is(notNullValue()));
217221
assertThat(intOrdersEmit.inputs(), hasSize(1));
218222
assertThat(intOrdersEmit.inputs().getFirst().id(), is("dev.staging.stg_orders"));
223+
assertThat(intOrdersEmit.outputs(), hasSize(1));
224+
assertThat(intOrdersEmit.outputs().getFirst().getId(), is("dev.marts.fct_orders"));
219225

220-
// stg_orders has no model dependencies (source is filtered out)
221-
var stgOrdersEmit = emittedByOutputId.get("dev.staging.stg_orders");
222-
assertThat(stgOrdersEmit.inputs(), hasSize(0));
226+
// fct_orders: 1 input (int_orders only, from parent_map), no outputs (leaf)
227+
var fctOrdersEmit = findEmitWithInput(runContext.assets().emitted(), "dev.intermediate.int_orders");
228+
assertThat(fctOrdersEmit, is(notNullValue()));
229+
assertThat(fctOrdersEmit.inputs(), hasSize(1));
230+
assertThat(fctOrdersEmit.inputs().getFirst().id(), is("dev.intermediate.int_orders"));
231+
assertThat(fctOrdersEmit.outputs(), hasSize(0));
232+
}
233+
234+
private static AssetEmit findEmitWithOutput(List<AssetEmit> emitted, String outputId) {
235+
return emitted.stream()
236+
.filter(e -> e.outputs().stream().anyMatch(o -> o.getId().equals(outputId)))
237+
.findFirst()
238+
.orElse(null);
239+
}
240+
241+
private static AssetEmit findEmitWithInput(List<AssetEmit> emitted, String inputId) {
242+
return emitted.stream()
243+
.filter(e -> e.inputs().stream().anyMatch(i -> i.id().equals(inputId)))
244+
.filter(e -> e.outputs().isEmpty())
245+
.findFirst()
246+
.orElse(null);
247+
}
248+
249+
private static AssetEmit findEmitWithInputAndOutput(List<AssetEmit> emitted, String inputId, String outputId) {
250+
return emitted.stream()
251+
.filter(e -> e.inputs().stream().anyMatch(i -> i.id().equals(inputId)))
252+
.filter(e -> e.outputs().stream().anyMatch(o -> o.getId().equals(outputId)))
253+
.findFirst()
254+
.orElse(null);
223255
}
224256

225257
private RunContext mockRunContext() {

0 commit comments

Comments
 (0)