Skip to content

Commit 8b6a420

Browse files
authored
fix(assets): use manifest parent_map for accurate lineage (#240)
* fix(assets): use manifest parent_map for accurate lineage Use the manifest-level parent_map (the canonical DAG) to resolve asset dependencies instead of node-level depends_on.nodes which can include extra edges not reflecting the real dbt DAG structure. * feat(assets): ensure lineage from the example is correct * fix(assets): properly set inputs & outputs --------- Co-authored-by: François Delbrayelle <fdelbrayelle@kestra.io>
1 parent 41cde75 commit 8b6a420

6 files changed

Lines changed: 497 additions & 57 deletions

File tree

src/main/java/io/kestra/plugin/dbt/ResultParser.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
66
import io.kestra.core.models.assets.AssetIdentifier;
77
import io.kestra.core.models.assets.AssetsInOut;
8+
import io.kestra.core.models.assets.Asset;
89
import io.kestra.core.models.assets.Custom;
910
import io.kestra.core.models.executions.TaskRun;
1011
import io.kestra.core.models.executions.TaskRunAttempt;
@@ -150,16 +151,9 @@ private static AssetsInOut assetsFor(String uniqueId, Map<String, ModelAsset> mo
150151
}
151152

152153
List<AssetIdentifier> inputs = inputIdentifiers(modelAsset, modelAssets);
154+
List<Asset> outputs = outputAssets(modelAsset, modelAssets);
153155

154-
return new AssetsInOut(
155-
inputs,
156-
List.of(Custom.builder()
157-
.id(modelAsset.assetId())
158-
.type(TABLE_ASSET_TYPE)
159-
.metadata(modelAsset.metadata())
160-
.build()
161-
)
162-
);
156+
return new AssetsInOut(inputs, outputs);
163157
}
164158

165159
private static void emitAssets(RunContext runContext, Manifest manifest) throws IllegalVariableEvaluationException {
@@ -168,24 +162,32 @@ private static void emitAssets(RunContext runContext, Manifest manifest) throws
168162

169163
for (ModelAsset asset : modelAssets.values()) {
170164
List<AssetIdentifier> inputs = inputIdentifiers(asset, modelAssets);
165+
List<Asset> outputs = outputAssets(asset, modelAssets);
171166
try {
172-
runContext.assets().emit(new AssetEmit(
173-
inputs,
174-
List.of(Custom.builder()
175-
.id(asset.assetId())
176-
.type(TABLE_ASSET_TYPE)
177-
.metadata(asset.metadata())
178-
.build()
179-
)
180-
)
181-
);
167+
runContext.assets().emit(new AssetEmit(inputs, outputs));
182168
} catch (UnsupportedOperationException | QueueException e) {
183169
// UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
184170
runContext.logger().warn("Unable to emit dbt asset '{}'", asset.assetId(), e);
185171
}
186172
}
187173
}
188174

175+
private static List<Asset> outputAssets(ModelAsset modelAsset, Map<String, ModelAsset> modelAssets) {
176+
if (modelAsset.children() == null || modelAsset.children().isEmpty()) {
177+
return List.of();
178+
}
179+
180+
return modelAsset.children().stream()
181+
.map(modelAssets::get)
182+
.filter(Objects::nonNull)
183+
.<Asset>map(child -> Custom.builder()
184+
.id(child.assetId())
185+
.type(TABLE_ASSET_TYPE)
186+
.metadata(child.metadata())
187+
.build())
188+
.toList();
189+
}
190+
189191
private static List<AssetIdentifier> inputIdentifiers(ModelAsset modelAsset, Map<String, ModelAsset> modelAssets) {
190192
if (modelAsset.dependsOn() == null || modelAsset.dependsOn().isEmpty()) {
191193
return List.of();
@@ -226,12 +228,18 @@ private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
226228
if (hasValue(node.getSchema())) metadata.put("schema", node.getSchema());
227229
if (hasValue(name)) metadata.put("name", name);
228230

229-
List<String> dependsOn = List.of();
230-
if (node.getDependsOn() != null) {
231+
// Use parent_map from manifest (the canonical DAG) when available,
232+
// falling back to node-level depends_on for older manifests.
233+
List<String> dependsOn;
234+
if (manifest.getParentMap() != null && manifest.getParentMap().containsKey(uniqueId)) {
235+
dependsOn = manifest.getParentMap().get(uniqueId);
236+
} else if (node.getDependsOn() != null) {
231237
dependsOn = node.getDependsOn().getOrDefault("nodes", List.of());
238+
} else {
239+
dependsOn = List.of();
232240
}
233241

234-
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn));
242+
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn, List.of()));
235243
}
236244

237245
Map<String, ModelAsset> filtered = new HashMap<>(modelAssets.size());
@@ -240,10 +248,26 @@ private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
240248
List<String> deps = a.dependsOn() == null ? List.of() : a.dependsOn().stream()
241249
.filter(modelAssets::containsKey)
242250
.toList();
243-
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps));
251+
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps, List.of()));
252+
}
253+
254+
// Compute reverse dependencies (children: models that depend on this one)
255+
Map<String, List<String>> childrenMap = new HashMap<>();
256+
for (Map.Entry<String, ModelAsset> e : filtered.entrySet()) {
257+
for (String dep : e.getValue().dependsOn()) {
258+
childrenMap.computeIfAbsent(dep, k -> new ArrayList<>()).add(e.getKey());
259+
}
260+
}
261+
262+
// Rebuild with children populated
263+
Map<String, ModelAsset> result = new HashMap<>(filtered.size());
264+
for (Map.Entry<String, ModelAsset> e : filtered.entrySet()) {
265+
ModelAsset a = e.getValue();
266+
List<String> children = childrenMap.getOrDefault(e.getKey(), List.of());
267+
result.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), a.dependsOn(), children));
244268
}
245269

246-
return filtered;
270+
return result;
247271
}
248272

249273
private static String adapterType(Manifest manifest) {
@@ -287,6 +311,6 @@ private static boolean hasValue(String value) {
287311
return value != null && !value.trim().isEmpty();
288312
}
289313

290-
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn) {
314+
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn, List<String> children) {
291315
}
292316
}

src/main/java/io/kestra/plugin/dbt/models/Manifest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public class Manifest {
1515
Map<String, Object> metadata;
1616
Map<String, Node> nodes;
1717

18+
@JsonProperty("parent_map")
19+
Map<String, List<String>> parentMap;
20+
1821
@Value
1922
@Jacksonized
2023
@SuperBuilder

src/test/java/io/kestra/plugin/dbt/ResultParserTest.java

Lines changed: 143 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
import jakarta.inject.Inject;
1111
import org.junit.jupiter.api.Test;
1212

13+
import io.kestra.core.runners.AssetEmit;
14+
1315
import java.nio.file.Files;
14-
import java.util.HashMap;
1516
import java.util.List;
1617
import java.util.Map;
1718

18-
import static java.util.stream.Collectors.toMap;
1919
import static org.hamcrest.MatcherAssert.assertThat;
2020
import static org.hamcrest.Matchers.*;
2121

@@ -56,6 +56,12 @@ void parseManifestWithAssets_shouldEmitModelAssets() throws Exception {
5656
]
5757
}
5858
}
59+
},
60+
"parent_map": {
61+
"model.analytics.stg_orders": [],
62+
"model.analytics.fct_orders": [
63+
"model.analytics.stg_orders"
64+
]
5965
}
6066
}
6167
""");
@@ -65,22 +71,23 @@ void parseManifestWithAssets_shouldEmitModelAssets() throws Exception {
6571
assertThat(manifestResult.manifest(), is(notNullValue()));
6672
assertThat(runContext.assets().emitted(), hasSize(2));
6773

68-
var outputAssets = runContext.assets().emitted().stream()
69-
.flatMap(assetEmit -> assetEmit.outputs().stream())
70-
.toList();
71-
72-
assertThat(outputAssets, hasSize(2));
73-
var byId = new HashMap<String, io.kestra.core.models.assets.Asset>();
74-
outputAssets.forEach(asset -> byId.put(asset.getId(), asset));
74+
// stg_orders has no inputs and 1 output (fct_orders, its child)
75+
// fct_orders has 1 input (stg_orders) and no outputs (leaf node)
76+
var stgOrdersEmit = findEmitWithOutput(runContext.assets().emitted(), "analytics.marts.fct_orders");
77+
assertThat("stg_orders emission should exist", stgOrdersEmit, is(notNullValue()));
78+
assertThat(stgOrdersEmit.inputs(), hasSize(0));
79+
assertThat(stgOrdersEmit.outputs(), hasSize(1));
7580

76-
assertThat(byId.containsKey("analytics.staging.stg_orders"), is(true));
77-
assertThat(byId.containsKey("analytics.marts.fct_orders"), is(true));
81+
var fctOrdersOutput = stgOrdersEmit.outputs().getFirst();
82+
assertThat(fctOrdersOutput.getMetadata().get("system"), is("postgres"));
83+
assertThat(fctOrdersOutput.getMetadata().get("database"), is("analytics"));
84+
assertThat(fctOrdersOutput.getMetadata().get("schema"), is("marts"));
85+
assertThat(fctOrdersOutput.getMetadata().get("name"), is("fct_orders"));
7886

79-
var stgOrders = byId.get("analytics.staging.stg_orders");
80-
assertThat(stgOrders.getMetadata().get("system"), is("postgres"));
81-
assertThat(stgOrders.getMetadata().get("database"), is("analytics"));
82-
assertThat(stgOrders.getMetadata().get("schema"), is("staging"));
83-
assertThat(stgOrders.getMetadata().get("name"), is("stg_orders"));
87+
var fctOrdersEmit = findEmitWithInput(runContext.assets().emitted(), "analytics.staging.stg_orders");
88+
assertThat("fct_orders emission should exist", fctOrdersEmit, is(notNullValue()));
89+
assertThat(fctOrdersEmit.inputs(), hasSize(1));
90+
assertThat(fctOrdersEmit.outputs(), hasSize(0));
8491
}
8592

8693
@Test
@@ -115,24 +122,136 @@ void parseManifestWithAssets_shouldEmitLineageInputs() throws Exception {
115122
]
116123
}
117124
}
125+
},
126+
"parent_map": {
127+
"model.analytics.my_first_dbt_model": [],
128+
"model.analytics.my_second_dbt_model": [
129+
"model.analytics.my_first_dbt_model"
130+
]
118131
}
119132
}
120133
""");
121134

122135
ResultParser.parseManifestWithAssets(runContext, manifestFile.toFile());
123136

124-
var emittedByOutputId = runContext.assets().emitted().stream()
125-
.collect(toMap(
126-
assetEmit -> assetEmit.outputs().getFirst().getId(),
127-
assetEmit -> assetEmit
128-
));
137+
assertThat(runContext.assets().emitted(), hasSize(2));
129138

130-
assertThat(emittedByOutputId, hasKey("analytics.marts.my_first_dbt_model"));
131-
assertThat(emittedByOutputId, hasKey("analytics.marts.my_second_dbt_model"));
139+
// my_first_dbt_model: no inputs, 1 output (my_second_dbt_model)
140+
var firstModelEmit = findEmitWithOutput(runContext.assets().emitted(), "analytics.marts.my_second_dbt_model");
141+
assertThat(firstModelEmit, is(notNullValue()));
142+
assertThat(firstModelEmit.inputs(), hasSize(0));
143+
assertThat(firstModelEmit.outputs(), hasSize(1));
132144

133-
var secondModelEmit = emittedByOutputId.get("analytics.marts.my_second_dbt_model");
145+
// my_second_dbt_model: 1 input (my_first_dbt_model), no outputs (leaf)
146+
var secondModelEmit = findEmitWithInput(runContext.assets().emitted(), "analytics.marts.my_first_dbt_model");
147+
assertThat(secondModelEmit, is(notNullValue()));
134148
assertThat(secondModelEmit.inputs(), hasSize(1));
135149
assertThat(secondModelEmit.inputs().getFirst().id(), is("analytics.marts.my_first_dbt_model"));
150+
assertThat(secondModelEmit.outputs(), hasSize(0));
151+
}
152+
153+
@Test
154+
void parseManifestWithAssets_shouldUseParentMapForLineage() throws Exception {
155+
// Simulate a case where depends_on.nodes includes transitive deps
156+
// but parent_map only has the direct edges (the real DAG).
157+
var runContext = mockRunContext();
158+
var manifestFile = runContext.workingDir().path(true).resolve("manifest.json");
159+
Files.writeString(manifestFile, """
160+
{
161+
"metadata": {
162+
"adapter_type": "duckdb"
163+
},
164+
"nodes": {
165+
"model.project.stg_orders": {
166+
"resource_type": "model",
167+
"database": "dev",
168+
"schema": "staging",
169+
"name": "stg_orders",
170+
"unique_id": "model.project.stg_orders",
171+
"depends_on": {
172+
"nodes": ["source.project.raw.orders"]
173+
}
174+
},
175+
"model.project.int_orders": {
176+
"resource_type": "model",
177+
"database": "dev",
178+
"schema": "intermediate",
179+
"name": "int_orders",
180+
"unique_id": "model.project.int_orders",
181+
"depends_on": {
182+
"nodes": ["model.project.stg_orders"]
183+
}
184+
},
185+
"model.project.fct_orders": {
186+
"resource_type": "model",
187+
"database": "dev",
188+
"schema": "marts",
189+
"name": "fct_orders",
190+
"unique_id": "model.project.fct_orders",
191+
"depends_on": {
192+
"nodes": ["model.project.stg_orders", "model.project.int_orders"]
193+
}
194+
}
195+
},
196+
"parent_map": {
197+
"model.project.stg_orders": ["source.project.raw.orders"],
198+
"model.project.int_orders": ["model.project.stg_orders"],
199+
"model.project.fct_orders": ["model.project.int_orders"]
200+
}
201+
}
202+
""");
203+
204+
ResultParser.parseManifestWithAssets(runContext, manifestFile.toFile());
205+
206+
assertThat(runContext.assets().emitted(), hasSize(3));
207+
208+
// DAG: stg_orders → int_orders → fct_orders (parent_map, no transitive edges)
209+
// Inputs = upstream deps, Outputs = downstream children
210+
211+
// stg_orders: no model inputs (source filtered out), 1 output (int_orders)
212+
var stgOrdersEmit = findEmitWithOutput(runContext.assets().emitted(), "dev.intermediate.int_orders");
213+
assertThat(stgOrdersEmit, is(notNullValue()));
214+
assertThat(stgOrdersEmit.inputs(), hasSize(0));
215+
assertThat(stgOrdersEmit.outputs(), hasSize(1));
216+
217+
// int_orders: 1 input (stg_orders), 1 output (fct_orders)
218+
var intOrdersEmit = findEmitWithInputAndOutput(runContext.assets().emitted(),
219+
"dev.staging.stg_orders", "dev.marts.fct_orders");
220+
assertThat(intOrdersEmit, is(notNullValue()));
221+
assertThat(intOrdersEmit.inputs(), hasSize(1));
222+
assertThat(intOrdersEmit.inputs().getFirst().id(), is("dev.staging.stg_orders"));
223+
assertThat(intOrdersEmit.outputs(), hasSize(1));
224+
assertThat(intOrdersEmit.outputs().getFirst().getId(), is("dev.marts.fct_orders"));
225+
226+
// fct_orders: 1 input (int_orders only, from parent_map), no outputs (leaf)
227+
var fctOrdersEmit = findEmitWithInput(runContext.assets().emitted(), "dev.intermediate.int_orders");
228+
assertThat(fctOrdersEmit, is(notNullValue()));
229+
assertThat(fctOrdersEmit.inputs(), hasSize(1));
230+
assertThat(fctOrdersEmit.inputs().getFirst().id(), is("dev.intermediate.int_orders"));
231+
assertThat(fctOrdersEmit.outputs(), hasSize(0));
232+
}
233+
234+
private static AssetEmit findEmitWithOutput(List<AssetEmit> emitted, String outputId) {
235+
return emitted.stream()
236+
.filter(e -> e.outputs().stream().anyMatch(o -> o.getId().equals(outputId)))
237+
.findFirst()
238+
.orElse(null);
239+
}
240+
241+
private static AssetEmit findEmitWithInput(List<AssetEmit> emitted, String inputId) {
242+
return emitted.stream()
243+
.filter(e -> e.inputs().stream().anyMatch(i -> i.id().equals(inputId)))
244+
.filter(e -> e.outputs().isEmpty())
245+
.findFirst()
246+
.orElse(null);
247+
}
248+
249+
private static AssetEmit findEmitWithInputAndOutput(List<AssetEmit> emitted, String inputId, String outputId) {
250+
return emitted.stream()
251+
.filter(e -> e.inputs().stream().anyMatch(i -> i.id().equals(inputId)))
252+
.filter(e -> e.outputs().stream().anyMatch(o -> o.getId().equals(outputId)))
253+
.findFirst()
254+
.orElse(null);
136255
}
137256

138257
private RunContext mockRunContext() {

0 commit comments

Comments
 (0)