Skip to content

Commit 3322aae

Browse files
committed
feat(assets): create assets for dbt models
# Conflicts: # gradle.properties
1 parent b08642a commit 3322aae

15 files changed

Lines changed: 842 additions & 92 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
**********************************************************************************************************************/
6767
test {
6868
useJUnitPlatform()
69+
systemProperty "kestra.encryption.secret-key", "test"
6970
}
7071

7172
testlogger {

gradle.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
version=1.2.3-SNAPSHOT
2-
kestraVersion=1.2.4
2+
# FIXME: to be changed with 1.3.0
3+
kestraVersion=1.3.0-SNAPSHOT
34
org.gradle.jvmargs=-Xmx4g -XX:MaxMetaspaceSize=1g

src/main/java/io/kestra/plugin/dbt/ResultParser.java

Lines changed: 180 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,54 @@
33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
6+
import io.kestra.core.models.assets.AssetIdentifier;
7+
import io.kestra.core.models.assets.AssetsInOut;
8+
import io.kestra.core.models.assets.Custom;
69
import io.kestra.core.models.executions.TaskRun;
710
import io.kestra.core.models.executions.TaskRunAttempt;
811
import io.kestra.core.models.executions.metrics.Counter;
912
import io.kestra.core.models.flows.State;
13+
import io.kestra.core.queues.QueueException;
14+
import io.kestra.core.runners.AssetEmit;
1015
import io.kestra.core.runners.RunContext;
1116
import io.kestra.core.runners.WorkerTaskResult;
1217
import io.kestra.core.serializers.JacksonMapper;
1318
import io.kestra.core.utils.IdUtils;
19+
import io.kestra.plugin.dbt.models.Manifest;
1420
import io.kestra.plugin.dbt.models.RunResult;
1521

1622
import java.io.File;
1723
import java.io.IOException;
1824
import java.net.URI;
1925
import java.time.Instant;
20-
import java.util.ArrayList;
21-
import java.util.List;
22-
import java.util.Objects;
26+
import java.util.*;
2327

2428
import static io.kestra.core.utils.Rethrow.throwFunction;
2529

2630
public abstract class ResultParser {
2731
static final protected ObjectMapper MAPPER = JacksonMapper.ofJson(false)
2832
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
2933

30-
public static URI parseManifest(RunContext runContext, File file) throws IOException {
31-
return runContext.storage().putFile(file);
34+
private static final String TABLE_ASSET_TYPE = "io.kestra.plugin.ee.assets.Table";
35+
private static final String RESOURCE_TYPE_MODEL = "model";
36+
37+
public record ManifestResult(Manifest manifest, URI uri) {
38+
}
39+
40+
public static ManifestResult parseManifestWithAssets(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
41+
Manifest manifest = MAPPER.readValue(file, Manifest.class);
42+
emitAssets(runContext, manifest);
43+
return new ManifestResult(manifest, runContext.storage().putFile(file));
3244
}
3345

34-
public static URI parseRunResult(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
46+
public static URI parseRunResult(RunContext runContext, File file, Manifest manifest) throws IOException, IllegalVariableEvaluationException {
3547
RunResult result = MAPPER.readValue(
3648
file,
3749
RunResult.class
3850
);
3951

52+
Map<String, ModelAsset> modelAssets = manifest == null ? Map.of() : extractModelAssets(manifest);
53+
4054
java.util.List<WorkerTaskResult> workerTaskResults = result
4155
.getResults()
4256
.stream()
@@ -96,22 +110,26 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
96110
.filter(Objects::nonNull)
97111
.forEach(runContext::metric);
98112

99-
return WorkerTaskResult.builder()
100-
.taskRun(TaskRun.builder()
101-
.id(IdUtils.create())
102-
.namespace(runContext.render("{{ flow.namespace }}"))
103-
.flowId(runContext.render("{{ flow.id }}"))
104-
.taskId(r.getUniqueId())
105-
.value(runContext.render("{{ taskrun.id }}"))
106-
.executionId(runContext.render("{{ execution.id }}"))
107-
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
113+
AssetsInOut assets = assetsFor(r.getUniqueId(), modelAssets);
114+
TaskRun.TaskRunBuilder taskRunBuilder = TaskRun.builder()
115+
.id(IdUtils.create())
116+
.namespace(runContext.render("{{ flow.namespace }}"))
117+
.flowId(runContext.render("{{ flow.id }}"))
118+
.taskId(r.getUniqueId())
119+
.value(runContext.render("{{ taskrun.id }}"))
120+
.executionId(runContext.render("{{ execution.id }}"))
121+
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
122+
.state(state)
123+
.attempts(List.of(TaskRunAttempt.builder()
108124
.state(state)
109-
.attempts(List.of(TaskRunAttempt.builder()
110-
.state(state)
111-
.build()
112-
))
113125
.build()
114-
)
126+
));
127+
if (assets != null) {
128+
taskRunBuilder.assets(assets);
129+
}
130+
131+
return WorkerTaskResult.builder()
132+
.taskRun(taskRunBuilder.build())
115133
.build();
116134
}))
117135
.toList();
@@ -120,4 +138,146 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
120138

121139
return runContext.storage().putFile(file);
122140
}
141+
142+
private static AssetsInOut assetsFor(String uniqueId, Map<String, ModelAsset> modelAssets) {
143+
if (uniqueId == null) {
144+
return null;
145+
}
146+
147+
ModelAsset modelAsset = modelAssets.get(uniqueId);
148+
if (modelAsset == null) {
149+
return null;
150+
}
151+
152+
List<AssetIdentifier> inputs = modelAsset.dependsOn().stream()
153+
.map(modelAssets::get)
154+
.filter(Objects::nonNull)
155+
.map(dep -> new AssetIdentifier(null, null, dep.assetId(), TABLE_ASSET_TYPE))
156+
.toList();
157+
158+
return new AssetsInOut(
159+
inputs,
160+
List.of(Custom.builder()
161+
.id(modelAsset.assetId())
162+
.type(TABLE_ASSET_TYPE)
163+
.metadata(modelAsset.metadata())
164+
.build()
165+
)
166+
);
167+
}
168+
169+
private static void emitAssets(RunContext runContext, Manifest manifest) throws IllegalVariableEvaluationException {
170+
Map<String, ModelAsset> modelAssets = extractModelAssets(manifest);
171+
runContext.logger().info("dbt assets extracted from manifest: {}", modelAssets.size());
172+
173+
for (ModelAsset asset : modelAssets.values()) {
174+
try {
175+
runContext.assets().emit(new AssetEmit(
176+
List.of(),
177+
List.of(Custom.builder()
178+
.id(asset.assetId())
179+
.type(TABLE_ASSET_TYPE)
180+
.metadata(asset.metadata())
181+
.build()
182+
)
183+
)
184+
);
185+
} catch (UnsupportedOperationException | QueueException e) {
186+
// UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
187+
runContext.logger().warn("Unable to emit dbt asset '{}'", asset.assetId(), e);
188+
}
189+
}
190+
}
191+
192+
private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
193+
if (manifest == null || manifest.getNodes() == null || manifest.getNodes().isEmpty()) {
194+
return Map.of();
195+
}
196+
197+
String system = adapterType(manifest);
198+
Map<String, ModelAsset> modelAssets = new HashMap<>();
199+
200+
for (Map.Entry<String, Manifest.Node> entry : manifest.getNodes().entrySet()) {
201+
Manifest.Node node = entry.getValue();
202+
if (node == null || !RESOURCE_TYPE_MODEL.equalsIgnoreCase(node.getResourceType())) {
203+
continue;
204+
}
205+
206+
String uniqueId = firstNonBlank(node.getUniqueId(), entry.getKey());
207+
if (uniqueId == null) {
208+
continue;
209+
}
210+
211+
String name = firstNonBlank(node.getAlias(), node.getName(), uniqueId);
212+
String assetId = assetIdFor(node.getDatabase(), node.getSchema(), name, uniqueId);
213+
214+
Map<String, Object> metadata = new HashMap<>();
215+
if (hasValue(system)) metadata.put("system", system);
216+
if (hasValue(node.getDatabase())) metadata.put("database", node.getDatabase());
217+
if (hasValue(node.getSchema())) metadata.put("schema", node.getSchema());
218+
if (hasValue(name)) metadata.put("name", name);
219+
220+
List<String> dependsOn = List.of();
221+
if (node.getDependsOn() != null) {
222+
dependsOn = node.getDependsOn().getOrDefault("nodes", List.of());
223+
}
224+
225+
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn));
226+
}
227+
228+
Map<String, ModelAsset> filtered = new HashMap<>(modelAssets.size());
229+
for (Map.Entry<String, ModelAsset> e : modelAssets.entrySet()) {
230+
ModelAsset a = e.getValue();
231+
List<String> deps = a.dependsOn() == null ? List.of() : a.dependsOn().stream()
232+
.filter(modelAssets::containsKey)
233+
.toList();
234+
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps));
235+
}
236+
237+
return filtered;
238+
}
239+
240+
private static String adapterType(Manifest manifest) {
241+
if (manifest.getMetadata() == null) {
242+
return null;
243+
}
244+
Object adapterType = manifest.getMetadata().get("adapter_type");
245+
return adapterType == null ? null : adapterType.toString();
246+
}
247+
248+
private static String assetIdFor(String database, String schema, String name, String fallback) {
249+
List<String> parts = new ArrayList<>();
250+
if (hasValue(database)) {
251+
parts.add(database);
252+
}
253+
if (hasValue(schema)) {
254+
parts.add(schema);
255+
}
256+
if (hasValue(name)) {
257+
parts.add(name);
258+
}
259+
if (!parts.isEmpty()) {
260+
return String.join(".", parts);
261+
}
262+
return fallback;
263+
}
264+
265+
private static String firstNonBlank(String... values) {
266+
if (values == null) {
267+
return null;
268+
}
269+
for (String value : values) {
270+
if (hasValue(value)) {
271+
return value;
272+
}
273+
}
274+
return null;
275+
}
276+
277+
private static boolean hasValue(String value) {
278+
return value != null && !value.trim().isEmpty();
279+
}
280+
281+
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn) {
282+
}
123283
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kestra.plugin.dbt;
2+
3+
import io.kestra.core.runners.RunContext;
4+
5+
import java.lang.reflect.Field;
6+
import java.util.Optional;
7+
8+
public final class RunContextUtils {
9+
private RunContextUtils() {
10+
}
11+
12+
public static void ensureSecretKey(RunContext runContext) {
13+
if (runContext == null) {
14+
return;
15+
}
16+
17+
try {
18+
Class<?> type = runContext.getClass();
19+
Field field = null;
20+
while (type != null && field == null) {
21+
try {
22+
field = type.getDeclaredField("secretKey");
23+
} catch (NoSuchFieldException e) {
24+
type = type.getSuperclass();
25+
}
26+
}
27+
28+
if (field == null) {
29+
return;
30+
}
31+
32+
initializeRunContext(runContext);
33+
field.setAccessible(true);
34+
if (field.get(runContext) == null) {
35+
field.set(runContext, Optional.empty());
36+
}
37+
} catch (IllegalAccessException e) {
38+
// Ignore to avoid breaking task execution on reflection issues.
39+
}
40+
}
41+
42+
private static void initializeRunContext(RunContext runContext) {
43+
try {
44+
Field appField = null;
45+
Class<?> type = runContext.getClass();
46+
while (type != null && appField == null) {
47+
try {
48+
appField = type.getDeclaredField("applicationContext");
49+
} catch (NoSuchFieldException e) {
50+
type = type.getSuperclass();
51+
}
52+
}
53+
if (appField == null) {
54+
return;
55+
}
56+
appField.setAccessible(true);
57+
Object appContext = appField.get(runContext);
58+
if (appContext == null) {
59+
return;
60+
}
61+
var initMethod = runContext.getClass().getDeclaredMethod("init", io.micronaut.context.ApplicationContext.class);
62+
initMethod.setAccessible(true);
63+
initMethod.invoke(runContext, appContext);
64+
} catch (ReflectiveOperationException e) {
65+
// Ignore to avoid breaking task execution on reflection issues.
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)