Skip to content

Commit b265aa3

Browse files
committed
feat(cloud): add a TriggerRun task for dbt cloud
1 parent 560703a commit b265aa3

27 files changed

Lines changed: 1396 additions & 113 deletions

.github/workflows/main.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ jobs:
4848

4949
# Gradle check
5050
- name: Build with Gradle
51-
run: ./gradlew check
51+
env:
52+
DBT_APPLICATION: ${{ secrets.DBT_APPLICATION }}
53+
run: |
54+
echo $DBT_APPLICATION | base64 -d > src/test/resources/application-test.yml
55+
./gradlew check
5256
5357
# Publish
5458
- name: Publish package to Sonatype
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.kestra.plugin.dbt;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
6+
import io.kestra.core.models.executions.AbstractMetricEntry;
7+
import io.kestra.core.models.executions.TaskRun;
8+
import io.kestra.core.models.executions.TaskRunAttempt;
9+
import io.kestra.core.models.executions.metrics.Counter;
10+
import io.kestra.core.models.flows.State;
11+
import io.kestra.core.runners.RunContext;
12+
import io.kestra.core.runners.WorkerTaskResult;
13+
import io.kestra.core.serializers.JacksonMapper;
14+
import io.kestra.core.utils.IdUtils;
15+
import io.kestra.plugin.dbt.models.Manifest;
16+
import io.kestra.plugin.dbt.models.RunResult;
17+
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.time.Instant;
21+
import java.util.Objects;
22+
import java.util.stream.Collectors;
23+
24+
import static io.kestra.core.utils.Rethrow.throwFunction;
25+
26+
public abstract class ResultParser {
27+
static final protected ObjectMapper MAPPER = JacksonMapper.ofJson(false)
28+
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
29+
30+
public static Manifest parseManifest(RunContext runContext, File file) throws IOException {
31+
return MAPPER.readValue(
32+
file,
33+
Manifest.class
34+
);
35+
}
36+
37+
public static RunResult parseRunResult(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
38+
RunResult result = MAPPER.readValue(
39+
file,
40+
RunResult.class
41+
);
42+
43+
java.util.List<WorkerTaskResult> workerTaskResults = result
44+
.getResults()
45+
.stream()
46+
.map(throwFunction(r -> {
47+
State state = State.of(
48+
r.state(),
49+
java.util.List.of(
50+
new State.History(
51+
State.Type.CREATED,
52+
Instant.ofEpochMilli(r.getTiming()
53+
.stream()
54+
.mapToLong(timing -> timing.getStartedAt().toEpochMilli())
55+
.min()
56+
.orElseThrow())
57+
),
58+
new State.History(
59+
State.Type.RUNNING,
60+
Instant.ofEpochMilli(r.getTiming()
61+
.stream()
62+
.filter(timing -> timing.getName().equals("execute"))
63+
.mapToLong(timing -> timing.getStartedAt().toEpochMilli())
64+
.min()
65+
.orElseThrow())
66+
),
67+
new State.History(
68+
r.state(),
69+
Instant.ofEpochMilli(r.getTiming()
70+
.stream()
71+
.mapToLong(timing -> timing.getCompletedAt().toEpochMilli())
72+
.max()
73+
.orElseThrow())
74+
)
75+
)
76+
);
77+
78+
java.util.List<AbstractMetricEntry<?>> metrics = r.getAdapterResponse()
79+
.entrySet()
80+
.stream()
81+
.map(e -> {
82+
switch (e.getKey()) {
83+
case "rows_affected":
84+
return Counter.of("rows.affected", Double.valueOf(e.getValue()));
85+
case "bytes_processed":
86+
return Counter.of("bytes.processed", Double.valueOf(e.getValue()));
87+
}
88+
89+
return null;
90+
})
91+
.filter(Objects::nonNull)
92+
.peek(runContext::metric)
93+
.collect(Collectors.toList());
94+
95+
return WorkerTaskResult.builder()
96+
.taskRun(TaskRun.builder()
97+
.id(IdUtils.create())
98+
.namespace(runContext.render("{{ flow.namespace }}"))
99+
.flowId(runContext.render("{{ flow.id }}"))
100+
.taskId(r.getUniqueId())
101+
.value(runContext.render("{{ taskrun.id }}"))
102+
.executionId(runContext.render("{{ execution.id }}"))
103+
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
104+
.state(state)
105+
.attempts(java.util.List.of(TaskRunAttempt.builder()
106+
.state(state)
107+
.metrics(metrics)
108+
.build()
109+
))
110+
.build()
111+
)
112+
.build();
113+
}))
114+
.collect(Collectors.toList());
115+
116+
runContext.dynamicWorkerResult(workerTaskResults);
117+
118+
return result;
119+
}
120+
}
Lines changed: 4 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,21 @@
11
package io.kestra.plugin.dbt.cli;
22

3-
import com.fasterxml.jackson.annotation.JsonInclude;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
53
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
64
import io.kestra.core.models.annotations.PluginProperty;
7-
import io.kestra.core.models.executions.AbstractMetricEntry;
8-
import io.kestra.core.models.executions.TaskRun;
9-
import io.kestra.core.models.executions.TaskRunAttempt;
10-
import io.kestra.core.models.executions.metrics.Counter;
11-
import io.kestra.core.models.flows.State;
125
import io.kestra.core.models.tasks.DynamicTask;
136
import io.kestra.core.models.tasks.RunnableTask;
147
import io.kestra.core.runners.RunContext;
15-
import io.kestra.core.runners.WorkerTaskResult;
16-
import io.kestra.core.serializers.JacksonMapper;
178
import io.kestra.core.tasks.scripts.AbstractBash;
189
import io.kestra.core.tasks.scripts.AbstractLogThread;
1910
import io.kestra.core.tasks.scripts.ScriptOutput;
20-
import io.kestra.core.utils.IdUtils;
21-
import io.kestra.plugin.dbt.cli.models.Manifest;
22-
import io.kestra.plugin.dbt.cli.models.RunResult;
11+
import io.kestra.plugin.dbt.ResultParser;
2312
import io.swagger.v3.oas.annotations.media.Schema;
2413
import lombok.*;
2514
import lombok.experimental.SuperBuilder;
2615
import org.slf4j.Logger;
2716

28-
import java.io.IOException;
29-
import java.time.Instant;
30-
import java.util.*;
31-
import java.util.stream.Collectors;
17+
import java.util.ArrayList;
3218

33-
import static io.kestra.core.utils.Rethrow.throwFunction;
3419
import static io.kestra.core.utils.Rethrow.throwSupplier;
3520

3621
@SuperBuilder
@@ -39,9 +24,6 @@
3924
@Getter
4025
@NoArgsConstructor
4126
public abstract class AbstractDbt extends AbstractBash implements RunnableTask<ScriptOutput>, DynamicTask {
42-
static final protected ObjectMapper MAPPER = JacksonMapper.ofJson(false)
43-
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
44-
4527
@Builder.Default
4628
@Schema(
4729
title = "Stop execution upon a first failure."
@@ -105,7 +87,8 @@ public ScriptOutput run(RunContext runContext) throws Exception {
10587
return String.join(" ", commands);
10688
}));
10789

108-
parseRunResult(runContext);
90+
ResultParser.parseRunResult(runContext, this.workingDirectory.resolve("target/run_results.json").toFile());
91+
ResultParser.parseManifest(runContext, this.workingDirectory.resolve("target/manifest.json").toFile());
10992

11093
return run;
11194
}
@@ -122,93 +105,4 @@ protected LogSupplier defaultLogSupplier(Logger logger, RunContext runContext) {
122105
return thread;
123106
};
124107
}
125-
126-
protected Manifest parseManifest() throws IOException {
127-
return MAPPER.readValue(
128-
this.workingDirectory.resolve("target/manifest.json").toFile(),
129-
Manifest.class
130-
);
131-
}
132-
133-
protected void parseRunResult(RunContext runContext) throws IOException, IllegalVariableEvaluationException {
134-
RunResult result = MAPPER.readValue(
135-
this.workingDirectory.resolve("target/run_results.json").toFile(),
136-
RunResult.class
137-
);
138-
139-
java.util.List<WorkerTaskResult> workerTaskResults = result
140-
.getResults()
141-
.stream()
142-
.map(throwFunction(r -> {
143-
State state = State.of(
144-
r.state(),
145-
java.util.List.of(
146-
new State.History(
147-
State.Type.CREATED,
148-
Instant.ofEpochMilli(r.getTiming()
149-
.stream()
150-
.mapToLong(timing -> timing.getStartedAt().toEpochMilli())
151-
.min()
152-
.orElseThrow())
153-
),
154-
new State.History(
155-
State.Type.RUNNING,
156-
Instant.ofEpochMilli(r.getTiming()
157-
.stream()
158-
.filter(timing -> timing.getName().equals("execute"))
159-
.mapToLong(timing -> timing.getStartedAt().toEpochMilli())
160-
.min()
161-
.orElseThrow())
162-
),
163-
new State.History(
164-
r.state(),
165-
Instant.ofEpochMilli(r.getTiming()
166-
.stream()
167-
.mapToLong(timing -> timing.getCompletedAt().toEpochMilli())
168-
.max()
169-
.orElseThrow())
170-
)
171-
)
172-
);
173-
174-
java.util.List<AbstractMetricEntry<?>> metrics = r.getAdapterResponse()
175-
.entrySet()
176-
.stream()
177-
.map(e -> {
178-
switch (e.getKey()) {
179-
case "rows_affected":
180-
return Counter.of("rows.affected", Double.valueOf(e.getValue()));
181-
case "bytes_processed":
182-
return Counter.of("bytes.processed", Double.valueOf(e.getValue()));
183-
}
184-
185-
return null;
186-
})
187-
.filter(Objects::nonNull)
188-
.peek(runContext::metric)
189-
.collect(Collectors.toList());
190-
191-
return WorkerTaskResult.builder()
192-
.taskRun(TaskRun.builder()
193-
.id(IdUtils.create())
194-
.namespace(runContext.render("{{ flow.namespace }}"))
195-
.flowId(runContext.render("{{ flow.id }}"))
196-
.taskId(r.getUniqueId())
197-
.value(runContext.render("{{ taskrun.id }}"))
198-
.executionId(runContext.render("{{ execution.id }}"))
199-
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
200-
.state(state)
201-
.attempts(java.util.List.of(TaskRunAttempt.builder()
202-
.state(state)
203-
.metrics(metrics)
204-
.build()
205-
))
206-
.build()
207-
)
208-
.build();
209-
}))
210-
.collect(Collectors.toList());
211-
212-
runContext.dynamicWorkerResult(workerTaskResults);
213-
}
214108
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.kestra.plugin.dbt.cloud;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.PluginProperty;
5+
import io.kestra.core.models.tasks.DynamicTask;
6+
import io.kestra.core.models.tasks.Task;
7+
import io.kestra.core.runners.RunContext;
8+
import io.micronaut.core.type.Argument;
9+
import io.micronaut.http.HttpResponse;
10+
import io.micronaut.http.MediaType;
11+
import io.micronaut.http.MutableHttpRequest;
12+
import io.micronaut.http.client.DefaultHttpClientConfiguration;
13+
import io.micronaut.http.client.HttpClient;
14+
import io.micronaut.http.client.exceptions.HttpClientResponseException;
15+
import io.swagger.v3.oas.annotations.media.Schema;
16+
import lombok.EqualsAndHashCode;
17+
import lombok.Getter;
18+
import lombok.NoArgsConstructor;
19+
import lombok.ToString;
20+
import lombok.experimental.SuperBuilder;
21+
22+
import java.net.MalformedURLException;
23+
import java.net.URI;
24+
import java.net.URISyntaxException;
25+
import javax.validation.constraints.NotNull;
26+
27+
@SuperBuilder
28+
@ToString
29+
@EqualsAndHashCode
30+
@Getter
31+
@NoArgsConstructor
32+
public abstract class AbstractDbtCloud extends Task implements DynamicTask {
33+
@Schema(
34+
title = "Numeric ID of the account"
35+
)
36+
@PluginProperty(dynamic = true)
37+
@NotNull
38+
String accountId;
39+
40+
@Schema(
41+
title = "API key."
42+
)
43+
@PluginProperty(dynamic = true)
44+
@NotNull
45+
String token;
46+
47+
protected HttpClient client() throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
48+
return HttpClient.create(URI.create("https://cloud.getdbt.com").toURL(), new DefaultHttpClientConfiguration());
49+
}
50+
51+
protected <REQ, RES> HttpResponse<RES> request(RunContext runContext, MutableHttpRequest<REQ> request, Argument<RES> argument) throws HttpClientResponseException {
52+
try {
53+
request = request
54+
.bearerAuth(runContext.render(this.token))
55+
.contentType(MediaType.APPLICATION_JSON);
56+
57+
try (HttpClient client = this.client()) {
58+
return client.toBlocking().exchange(request, argument);
59+
}
60+
} catch (HttpClientResponseException e) {
61+
throw new HttpClientResponseException(
62+
"Request failed '" + e.getStatus().getCode() + "' and body '" + e.getResponse().getBody(String.class).orElse("null") + "'",
63+
e,
64+
e.getResponse()
65+
);
66+
} catch (IllegalVariableEvaluationException | MalformedURLException | URISyntaxException e) {
67+
throw new RuntimeException(e);
68+
}
69+
}
70+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.kestra.plugin.dbt.cloud;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import lombok.Value;
5+
import lombok.experimental.SuperBuilder;
6+
import lombok.extern.jackson.Jacksonized;
7+
8+
import java.util.List;
9+
import javax.validation.Valid;
10+
11+
@Value
12+
@Jacksonized
13+
@SuperBuilder
14+
public class JobScheduleDate {
15+
@JsonProperty("type")
16+
JobScheduleDateType type;
17+
18+
@JsonProperty("days")
19+
@Valid
20+
List<Integer> days;
21+
22+
@JsonProperty("cron")
23+
String cron;
24+
}

0 commit comments

Comments
 (0)