Skip to content

feat(jdbc): Improve internal queue cleaning #8262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/queues/QueueInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,9 @@ default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Eith
return receive(consumerGroup, queueType, consumer, true);
}

Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
return receive(consumerGroup, queueType, consumer, forUpdate, false);
}

Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete);
}
14 changes: 14 additions & 0 deletions jdbc-h2/src/main/java/io/kestra/runner/h2/H2Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,18 @@ protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String q

update.execute();
}

@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets.toArray(Integer[]::new)));

if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}

update.execute();
}
}
16 changes: 16 additions & 0 deletions jdbc-mysql/src/main/java/io/kestra/runner/mysql/MysqlQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String q
update.execute();
}

@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx
.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));

if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}

update.execute();
}


private static final class MysqlQueueConsumers {

private static final Set<String> CONSUMERS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String q
update.execute();
}

@Override
protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets) {
var update = ctx.delete(DSL.table(table.getName()))
.where(AbstractJdbcRepository.field("offset").in(offsets));

if (consumerGroup != null) {
update = update.and(AbstractJdbcRepository.field("consumer_group").eq(consumerGroup));
} else {
update = update.and(AbstractJdbcRepository.field("consumer_group").isNull());
}

update.execute();
}

@Override
protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
return fetch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Eithe
});

eithers.forEach(consumer);
}));
},
true));

return this.disposable.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Eithe
}
consumer.accept(either);
});
}));
},
true));

return disposable.get();
}
Expand Down
17 changes: 1 addition & 16 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Value("${kestra.jdbc.executor.clean.execution-queue:true}")
private boolean cleanExecutionQueue;

@Value("${kestra.jdbc.executor.clean.worker-queue:true}")
private boolean cleanWorkerJobQueue;

private final Tracer tracer;

private final FlowRepositoryInterface flowRepository;
Expand Down Expand Up @@ -251,7 +248,7 @@ public void run() {
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
for (int i = 0; i < numberOfThreads; i++) {
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(null, Executor.class, this::workerTaskResultQueue, true, true));
}
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
Expand Down Expand Up @@ -1038,18 +1035,6 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
});
}

// Purge the workerTaskResultQueue and the workerJobQueue
// IMPORTANT: this is safe as only the executor is listening to WorkerTaskResult,
// and we are sure at this stage that all WorkerJob has been listened and processed by the Worker.
// If any of these assumptions changed, this code would not be safe anymore.
if (cleanWorkerJobQueue && !ListUtils.isEmpty(executor.getExecution().getTaskRunList())) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
}
}
} catch (QueueException e) {
if (!ignoreFailure) {
Expand Down
41 changes: 30 additions & 11 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ protected Result<Record> receiveFetch(DSLContext ctx, String consumerGroup, Stri

abstract protected void updateGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);

abstract protected void deleteGroupOffsets(DSLContext ctx, String consumerGroup, String queueType, List<Integer> offsets);

protected abstract Condition buildTypeCondition(String type);

@Override
Expand Down Expand Up @@ -302,15 +304,16 @@ public Runnable receive(String consumerGroup, Consumer<Either<T, Deserialization
}

@Override
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate) {
public Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate, boolean delete) {
return this.receiveImpl(
consumerGroup,
queueType,
(dslContext, eithers) -> {
eithers.forEach(consumer);
},
false,
forUpdate
forUpdate,
delete
);
}

Expand All @@ -330,17 +333,23 @@ public Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<
consumer.accept(eithers);
},
false,
forUpdate
forUpdate,
false
);
}

public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer) {
return this.receiveTransaction(consumerGroup, queueType, consumer, false);
}

public Runnable receiveTransaction(String consumerGroup, Class<?> queueType, BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer, boolean delete) {
return this.receiveImpl(
consumerGroup,
queueType,
consumer,
true,
true
true,
delete
);
}

Expand All @@ -349,7 +358,8 @@ public Runnable receiveImpl(
Class<?> queueType,
BiConsumer<DSLContext, List<Either<T, DeserializationException>>> consumer,
Boolean inTransaction,
boolean forUpdate
boolean forUpdate,
boolean delete
) {
String queueName = queueName(queueType);

Expand All @@ -364,12 +374,21 @@ public Runnable receiveImpl(
consumer.accept(ctx, this.map(result));
}

this.updateGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
if (delete) {
this.deleteGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
} else {
this.updateGroupOffsets(
ctx,
consumerGroup,
queueName,
result.map(record -> record.get("offset", Integer.class))
);
}
}

return result;
Expand Down
2 changes: 0 additions & 2 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
public class JdbcScheduler extends AbstractScheduler {
private final TriggerRepositoryInterface triggerRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final PluginDefaultService pluginDefaultService;

@Inject
public JdbcScheduler(
Expand All @@ -38,7 +37,6 @@ public JdbcScheduler(
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
}

@Override
Expand Down