Skip to content

Commit 780688b

Browse files
committed
fix(): align to EE
1 parent 75cc895 commit 780688b

File tree

4 files changed

+38
-6
lines changed

4 files changed

+38
-6
lines changed

Diff for: core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private List<FlowWithTriggers> computeSchedulable(List<Flow> flows, List<Trigger
287287
logError(conditionContext, flow, abstractTrigger, e);
288288
return null;
289289
}
290-
this.triggerState.save(triggerContext, scheduleContext);
290+
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
291291
} else {
292292
triggerContext = lastTrigger;
293293
}
@@ -389,7 +389,7 @@ private void handle() {
389389
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
390390

391391
try {
392-
this.triggerState.save(triggerRunning, scheduleContext);
392+
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
393393
this.sendPollingTriggerToWorker(f);
394394
} catch (InternalException e) {
395395
logService.logTrigger(
@@ -415,7 +415,7 @@ else if (f.getPollingTrigger() instanceof Schedule schedule) {
415415
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
416416
);
417417
trigger = trigger.checkBackfill();
418-
this.triggerState.save(trigger, scheduleContext);
418+
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
419419
}
420420
} else {
421421
logService.logTrigger(
@@ -433,7 +433,7 @@ else if (f.getPollingTrigger() instanceof Schedule schedule) {
433433
logError(f, e);
434434
}
435435
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
436-
this.triggerState.save(trigger, scheduleContext);
436+
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
437437
}
438438
} catch (InternalException ie) {
439439
// validate schedule condition can fail to render variables
@@ -450,7 +450,7 @@ else if (f.getPollingTrigger() instanceof Schedule schedule) {
450450
.build();
451451
ZonedDateTime nextExecutionDate = f.getPollingTrigger().nextEvaluationDate();
452452
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
453-
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
453+
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
454454
}
455455
});
456456
});
@@ -490,7 +490,7 @@ private void handleEvaluateSchedulingTriggerResult(Schedule schedule, SchedulerE
490490

491491
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
492492
// So we must save them by passing the scheduleContext.
493-
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
493+
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
494494
}
495495

496496
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {

Diff for: core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public interface SchedulerTriggerStateInterface {
2020

2121
Trigger create(Trigger trigger) throws ConstraintViolationException;
2222

23+
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
24+
25+
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
26+
2327
Trigger update(Trigger trigger);
2428

2529
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;

Diff for: jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java

+12
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInt
5454
return trigger;
5555
}
5656

57+
@Override
58+
public Trigger create(Trigger trigger, String headerContent) {
59+
return this.triggerRepository.create(trigger);
60+
}
61+
62+
@Override
63+
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
64+
this.triggerRepository.save(trigger, scheduleContextInterface);
65+
66+
return trigger;
67+
}
68+
5769
@Override
5870
public Trigger create(Trigger trigger) {
5971

Diff for: runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java

+16
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,22 @@ public Trigger create(Trigger trigger) {
5656
return trigger;
5757
}
5858

59+
@Override
60+
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
61+
triggers.put(trigger.uid(), trigger);
62+
triggerQueue.emit(trigger);
63+
64+
return trigger;
65+
}
66+
67+
@Override
68+
public Trigger create(Trigger trigger, String headerContent) {
69+
triggers.put(trigger.uid(), trigger);
70+
triggerQueue.emit(trigger);
71+
72+
return trigger;
73+
}
74+
5975
@Override
6076
public Trigger update(Trigger trigger) {
6177
triggers.put(trigger.uid(), trigger);

0 commit comments

Comments
 (0)