Skip to content

Commit 0bcb76f

Browse files
test: use KestraTest scheduler for trigger tests + evaluate
trigger (#156)
1 parent 10a5d66 commit 0bcb76f

14 files changed

Lines changed: 133 additions & 608 deletions

File tree

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
version=1.4.1-SNAPSHOT
2-
kestraVersion=1.3.13
2+
kestraVersion=1.3.16
33
debeziumVersion=3.3.1.Final
44
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m

plugin-debezium-db2/src/test/java/io/kestra/plugin/debezium/db2/RealtimeTriggerTest.java

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,20 @@
1414
import io.kestra.core.queues.QueueFactoryInterface;
1515
import io.kestra.core.queues.QueueInterface;
1616
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
17-
import io.kestra.core.runners.FlowListeners;
1817
import io.kestra.core.services.KVStoreService;
19-
import io.kestra.core.utils.IdUtils;
2018
import io.kestra.core.utils.TestsUtils;
21-
import io.kestra.jdbc.runner.JdbcScheduler;
2219
import io.kestra.plugin.debezium.AbstractDebeziumTest;
23-
import io.kestra.scheduler.AbstractScheduler;
24-
import io.kestra.worker.DefaultWorker;
2520

26-
import io.micronaut.context.ApplicationContext;
2721
import jakarta.inject.Inject;
2822
import jakarta.inject.Named;
2923
import reactor.core.publisher.Flux;
3024

3125
import static org.hamcrest.MatcherAssert.assertThat;
3226
import static org.hamcrest.Matchers.*;
3327

34-
@KestraTest
28+
@KestraTest(startRunner = true, startScheduler = true)
3529
@Disabled("The tests are disabled for CI, as db2 container have long time initialization")
3630
class RealtimeTriggerTest extends AbstractDebeziumTest {
37-
@Inject
38-
private ApplicationContext applicationContext;
39-
40-
@Inject
41-
private FlowListeners flowListenersService;
42-
4331
@Inject
4432
@Named(QueueFactoryInterface.EXECUTION_NAMED)
4533
private QueueInterface<Execution> executionQueue;
@@ -73,38 +61,20 @@ void cleanup() throws Exception {
7361

7462
@Test
7563
void flow() throws Exception {
76-
77-
// mock flow listeners
7864
CountDownLatch queueCount = new CountDownLatch(1);
65+
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
66+
queueCount.countDown();
67+
assertThat(execution.getLeft().getFlowId(), is("trigger"));
68+
});
7969

80-
// scheduler
81-
try (
82-
AbstractScheduler scheduler = new JdbcScheduler(
83-
this.applicationContext,
84-
this.flowListenersService
85-
);
86-
DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, IdUtils.create(), 8, null);
87-
) {
88-
// wait for execution
89-
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
90-
{
91-
queueCount.countDown();
92-
assertThat(execution.getLeft().getFlowId(), is("trigger"));
93-
});
94-
95-
worker.run();
96-
scheduler.run();
97-
98-
repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/realtime.yaml")));
99-
100-
boolean await = queueCount.await(15, TimeUnit.SECONDS);
101-
assertThat(await, is(true));
70+
repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/realtime.yaml")));
10271

103-
Map<String, Object> data = (Map<String, Object>) receive.blockLast().getTrigger().getVariables().get("data");
72+
boolean await = queueCount.await(15, TimeUnit.SECONDS);
73+
assertThat(await, is(true));
10474

105-
assertThat(data, notNullValue());
75+
Map<String, Object> data = (Map<String, Object>) receive.blockLast().getTrigger().getVariables().get("data");
10676

107-
assertThat(data.size(), greaterThanOrEqualTo(5));
108-
}
77+
assertThat(data, notNullValue());
78+
assertThat(data.size(), greaterThanOrEqualTo(5));
10979
}
11080
}
Lines changed: 7 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,18 @@
11
package io.kestra.plugin.debezium.db2;
22

3-
import java.util.Objects;
4-
import java.util.concurrent.CountDownLatch;
5-
import java.util.concurrent.TimeUnit;
3+
import java.util.Optional;
64

75
import org.junit.jupiter.api.BeforeEach;
86
import org.junit.jupiter.api.Disabled;
97
import org.junit.jupiter.api.Test;
108

9+
import io.kestra.core.junit.annotations.EvaluateTrigger;
1110
import io.kestra.core.junit.annotations.KestraTest;
1211
import io.kestra.core.models.executions.Execution;
13-
import io.kestra.core.queues.QueueFactoryInterface;
14-
import io.kestra.core.queues.QueueInterface;
15-
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
16-
import io.kestra.core.runners.FlowListeners;
1712
import io.kestra.core.services.KVStoreService;
18-
import io.kestra.core.utils.IdUtils;
19-
import io.kestra.core.utils.TestsUtils;
20-
import io.kestra.jdbc.runner.JdbcScheduler;
2113
import io.kestra.plugin.debezium.AbstractDebeziumTest;
22-
import io.kestra.scheduler.AbstractScheduler;
23-
import io.kestra.worker.DefaultWorker;
2414

25-
import io.micronaut.context.ApplicationContext;
2615
import jakarta.inject.Inject;
27-
import jakarta.inject.Named;
28-
import reactor.core.publisher.Flux;
2916

3017
import static org.hamcrest.MatcherAssert.assertThat;
3118
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -34,18 +21,6 @@
3421
@KestraTest
3522
@Disabled("The tests are disabled for CI, as db2 container have long time initialization")
3623
class TriggerTest extends AbstractDebeziumTest {
37-
@Inject
38-
private ApplicationContext applicationContext;
39-
40-
@Inject
41-
private FlowListeners flowListenersService;
42-
43-
@Inject
44-
@Named(QueueFactoryInterface.EXECUTION_NAMED)
45-
private QueueInterface<Execution> executionQueue;
46-
47-
@Inject
48-
protected LocalFlowRepositoryLoader repositoryLoader;
4924

5025
@Inject
5126
private KVStoreService kvStoreService;
@@ -72,36 +47,10 @@ void cleanup() throws Exception {
7247
}
7348

7449
@Test
75-
void flow() throws Exception {
76-
77-
// mock flow listeners
78-
CountDownLatch queueCount = new CountDownLatch(1);
79-
80-
// scheduler
81-
try (
82-
AbstractScheduler scheduler = new JdbcScheduler(
83-
this.applicationContext,
84-
this.flowListenersService
85-
);
86-
DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, IdUtils.create(), 8, null);
87-
) {
88-
// wait for execution
89-
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
90-
{
91-
queueCount.countDown();
92-
assertThat(execution.getLeft().getFlowId(), is("trigger"));
93-
});
94-
95-
worker.run();
96-
scheduler.run();
97-
98-
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/trigger.yaml")));
99-
100-
boolean await = queueCount.await(15, TimeUnit.SECONDS);
101-
102-
Integer trigger = (Integer) receive.blockLast().getTrigger().getVariables().get("size");
103-
104-
assertThat(trigger, greaterThanOrEqualTo(5));
105-
}
50+
@EvaluateTrigger(flow = "flows/trigger.yaml", triggerId = "watch")
51+
void flow(Optional<Execution> optionalExecution) {
52+
assertThat(optionalExecution.isPresent(), is(true));
53+
Integer size = (Integer) optionalExecution.get().getTrigger().getVariables().get("size");
54+
assertThat(size, greaterThanOrEqualTo(5));
10655
}
10756
}

plugin-debezium-mongodb/src/test/java/io/kestra/plugin/debezium/mongodb/RealtimeTriggerTest.java

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,19 @@
1313
import io.kestra.core.queues.QueueFactoryInterface;
1414
import io.kestra.core.queues.QueueInterface;
1515
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
16-
import io.kestra.core.runners.FlowListeners;
17-
import io.kestra.core.utils.IdUtils;
1816
import io.kestra.core.utils.TestsUtils;
19-
import io.kestra.jdbc.runner.JdbcScheduler;
2017
import io.kestra.plugin.debezium.AbstractDebeziumTest;
21-
import io.kestra.scheduler.AbstractScheduler;
22-
import io.kestra.worker.DefaultWorker;
2318

24-
import io.micronaut.context.ApplicationContext;
2519
import jakarta.inject.Inject;
2620
import jakarta.inject.Named;
2721
import reactor.core.publisher.Flux;
2822

2923
import static org.hamcrest.MatcherAssert.assertThat;
3024
import static org.hamcrest.Matchers.*;
3125

32-
@KestraTest
26+
@KestraTest(startRunner = true, startScheduler = true)
3327
@Disabled("Until there will be automatic way to execute mongo.js scripts")
3428
class RealtimeTriggerTest extends AbstractDebeziumTest {
35-
@Inject
36-
private ApplicationContext applicationContext;
37-
38-
@Inject
39-
private FlowListeners flowListenersService;
40-
4129
@Inject
4230
@Named(QueueFactoryInterface.EXECUTION_NAMED)
4331
private QueueInterface<Execution> executionQueue;
@@ -62,37 +50,20 @@ protected String getPassword() {
6250

6351
@Test
6452
void flow() throws Exception {
65-
// mock flow listeners
6653
CountDownLatch queueCount = new CountDownLatch(1);
54+
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
55+
queueCount.countDown();
56+
assertThat(execution.getLeft().getFlowId(), is("trigger"));
57+
});
6758

68-
// scheduler
69-
try (
70-
AbstractScheduler scheduler = new JdbcScheduler(
71-
this.applicationContext,
72-
this.flowListenersService
73-
);
74-
DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, IdUtils.create(), 8, null);
75-
) {
76-
// wait for execution
77-
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
78-
{
79-
queueCount.countDown();
80-
assertThat(execution.getLeft().getFlowId(), is("trigger"));
81-
});
82-
83-
worker.run();
84-
scheduler.run();
85-
86-
repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/realtime.yaml")));
87-
88-
boolean await = queueCount.await(15, TimeUnit.SECONDS);
89-
assertThat(await, is(true));
59+
repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/realtime.yaml")));
9060

91-
Map<String, Object> data = (Map<String, Object>) receive.blockLast().getTrigger().getVariables().get("data");
61+
boolean await = queueCount.await(15, TimeUnit.SECONDS);
62+
assertThat(await, is(true));
9263

93-
assertThat(data, notNullValue());
64+
Map<String, Object> data = (Map<String, Object>) receive.blockLast().getTrigger().getVariables().get("data");
9465

95-
assertThat(data.size(), greaterThanOrEqualTo(20));
96-
}
66+
assertThat(data, notNullValue());
67+
assertThat(data.size(), greaterThanOrEqualTo(20));
9768
}
9869
}
Lines changed: 7 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,13 @@
11
package io.kestra.plugin.debezium.mongodb;
22

3-
import java.util.Objects;
4-
import java.util.concurrent.CountDownLatch;
5-
import java.util.concurrent.TimeUnit;
3+
import java.util.Optional;
64

75
import org.junit.jupiter.api.Disabled;
86
import org.junit.jupiter.api.Test;
97

8+
import io.kestra.core.junit.annotations.EvaluateTrigger;
109
import io.kestra.core.junit.annotations.KestraTest;
1110
import io.kestra.core.models.executions.Execution;
12-
import io.kestra.core.queues.QueueFactoryInterface;
13-
import io.kestra.core.queues.QueueInterface;
14-
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
15-
import io.kestra.core.runners.FlowListeners;
16-
import io.kestra.core.utils.IdUtils;
17-
import io.kestra.core.utils.TestsUtils;
18-
import io.kestra.jdbc.runner.JdbcScheduler;
19-
import io.kestra.scheduler.AbstractScheduler;
20-
import io.kestra.worker.DefaultWorker;
21-
22-
import io.micronaut.context.ApplicationContext;
23-
import jakarta.inject.Inject;
24-
import jakarta.inject.Named;
25-
import reactor.core.publisher.Flux;
2611

2712
import static org.hamcrest.MatcherAssert.assertThat;
2813
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -31,50 +16,12 @@
3116
@KestraTest
3217
@Disabled("Until there will be automatic way to execute mongo.js scripts")
3318
class TriggerTest {
34-
@Inject
35-
private ApplicationContext applicationContext;
36-
37-
@Inject
38-
private FlowListeners flowListenersService;
39-
40-
@Inject
41-
@Named(QueueFactoryInterface.EXECUTION_NAMED)
42-
private QueueInterface<Execution> executionQueue;
43-
44-
@Inject
45-
protected LocalFlowRepositoryLoader repositoryLoader;
4619

4720
@Test
48-
void flow() throws Exception {
49-
// mock flow listeners
50-
CountDownLatch queueCount = new CountDownLatch(1);
51-
52-
// scheduler
53-
try (
54-
AbstractScheduler scheduler = new JdbcScheduler(
55-
this.applicationContext,
56-
this.flowListenersService
57-
);
58-
DefaultWorker worker = applicationContext.createBean(DefaultWorker.class, IdUtils.create(), 8, null);
59-
) {
60-
// wait for execution
61-
Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
62-
{
63-
queueCount.countDown();
64-
assertThat(execution.getLeft().getFlowId(), is("trigger"));
65-
});
66-
67-
worker.run();
68-
scheduler.run();
69-
70-
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/trigger.yaml")));
71-
72-
boolean await = queueCount.await(1, TimeUnit.MINUTES);
73-
assertThat(await, is(true));
74-
75-
Integer trigger = (Integer) receive.blockLast().getTrigger().getVariables().get("size");
76-
77-
assertThat(trigger, greaterThanOrEqualTo(20));
78-
}
21+
@EvaluateTrigger(flow = "flows/trigger.yaml", triggerId = "watch")
22+
void flow(Optional<Execution> optionalExecution) {
23+
assertThat(optionalExecution.isPresent(), is(true));
24+
Integer size = (Integer) optionalExecution.get().getTrigger().getVariables().get("size");
25+
assertThat(size, greaterThanOrEqualTo(20));
7926
}
8027
}

0 commit comments

Comments
 (0)