Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a20f761
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Dec 18, 2025
416b037
test(pod): add unit tests for waitForPodReady ContainersReady validation
Dec 22, 2025
3a8b696
Merge branch 'main' into feat-1218-pod-ready
fdelbrayelle Jan 16, 2026
0b9a4b1
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
494cb48
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
3710308
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
ae23de8
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
8e3ea7c
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
e8f6974
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
cc4226f
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
9586061
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
90dfdb5
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
53db6eb
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
93220c0
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
06c5156
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
2912768
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
a05ba23
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
935538c
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
12e0fb3
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 9, 2026
cc8da5e
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 10, 2026
14caad8
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 10, 2026
3022255
Ensure waitForPodReady only treats Pods as ready when ContainersReady…
Malaydewangan09 Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/setup-unit.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64"
curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.31.0/kind-linux-amd64"
chmod +x ./kind
./kind create cluster
curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl

docker pull ubuntu
Comment thread
Malaydewangan09 marked this conversation as resolved.
Outdated
./kind load docker-image ubuntu

docker pull busybox
./kind load docker-image busybox

docker pull debian:stable-slim
./kind load docker-image debian:stable-slim
26 changes: 19 additions & 7 deletions src/main/java/io/kestra/plugin/kubernetes/core/PodCreate.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,25 @@ public PodCreate.Output run(RunContext runContext) throws Exception {
pod = PodService.waitForPodReady(client, pod, rWaitUntilRunning);
}

if (pod.getStatus() != null && pod.getStatus().getPhase().equals("Failed")) {
throw PodService.failedMessage(pod);
// Set up log consumer for output parsing (used by both watch and fetchFinalLogs)
AbstractLogConsumer logConsumer = new DefaultLogConsumer(runContext);
podLogService.setLogConsumer(logConsumer);


Comment thread
Malaydewangan09 marked this conversation as resolved.
Outdated
if (pod.getStatus() != null) {
if ("Failed".equals(pod.getStatus().getPhase()) || PodService.hasNonTransientWaitingContainer(pod)) {
if (PodService.hasAnyContainerStarted(pod)) {
podLogService.fetchFinalLogs(client, pod, runContext);
}
PodService.logPodEvents(client, pod, logger, logConsumer);
throw PodService.failedMessage(pod);
}
}

// Wait for containers to start (Running) or pod to reach terminal state (Succeeded/Failed/Unknown)
// This ensures we proceed with log collection regardless of pod outcome
pod = PodService.waitForContainersStartedOrCompleted(client, pod, rWaitUntilRunning);

// Set up log consumer for output parsing (used by both watch and fetchFinalLogs)
AbstractLogConsumer logConsumer = new DefaultLogConsumer(runContext);
podLogService.setLogConsumer(logConsumer);

// Only start log streaming if pod is actually running
// For pods that complete quickly, fetchFinalLogs will handle log collection
Expand All @@ -425,7 +433,7 @@ public PodCreate.Output run(RunContext runContext) throws Exception {
}

// Collect late logs and check for failures (throws if container failed)
handleEnd(ended, runContext, this.outputFiles != null, client, podLogService);
handleEnd(ended, runContext, this.outputFiles != null, client, podLogService, logConsumer);

PodStatus podStatus = PodStatus.from(ended.getStatus());
Output.OutputBuilder output = Output.builder()
Expand Down Expand Up @@ -630,7 +638,7 @@ private void delete(KubernetesClient client, Logger logger, Pod pod, RunContext
}
}

private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, KubernetesClient client, PodLogService podLogService) throws Exception {
private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, KubernetesClient client, PodLogService podLogService, AbstractLogConsumer logConsumer) throws Exception {
Logger logger = runContext.logger();

// Wait for async log stream (watchLog) to finish processing
Expand All @@ -641,6 +649,10 @@ private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles,
Pod currentPod = PodService.podRef(client, ended).get();
if (currentPod != null) {
podLogService.fetchFinalLogs(client, ended, runContext);

if (!"Succeeded".equals(ended.getStatus().getPhase())) {
Comment thread
Malaydewangan09 marked this conversation as resolved.
Outdated
PodService.logPodEvents(client, ended, logger, logConsumer);
}
} else {
logger.debug("Pod '{}' was already deleted, skipping fetchFinalLogs", ended.getMetadata().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void fetchFinalLogs(KubernetesClient client, Pod pod, RunContext runConte

PodResource podResource = PodService.podRef(client, pod);

pod.getSpec().getContainers().forEach(container -> {
if (pod.getSpec().getContainers().stream().noneMatch(container -> {
try {
String logs = podResource
.inContainer(container.getName())
Expand All @@ -151,16 +151,43 @@ public void fetchFinalLogs(KubernetesClient client, Pod pod, RunContext runConte
.getLog();

if (logs != null && !logs.isEmpty()) {
// Write all logs - hash-based deduplication automatically filters duplicates
outputStream.write(logs.getBytes());
outputStream.flush();
} else {
logger.debug("No logs returned for container '{}'", container.getName());
return true;
}
runContext.logger().debug("No logs returned for container '{}'", container.getName());
} catch (IOException e) {
logger.error("Failed to fetch final logs for container '{}'", container.getName(), e);
runContext.logger().error("Failed to fetch final logs for container '{}'", container.getName(), e);
Comment thread
Malaydewangan09 marked this conversation as resolved.
Outdated
}
});
return false;
})) {
// if no container logs were found, the pod likely never started.
// we fall back to Kubernetes pod events
fetchPodEvents(client, pod, runContext);
}
}

private void fetchPodEvents(KubernetesClient client, Pod pod, RunContext runContext) {
try {
var events = client.v1().events()
.inNamespace(pod.getMetadata().getNamespace())
.withField("involvedObject.name", pod.getMetadata().getName())
.list()
.getItems();

if (events.isEmpty()) {
runContext.logger().warn("No container logs and no pod events found for pod '{}'", pod.getMetadata().getName());
return;
}

runContext.logger().info("No container logs available. Pod events:");
for (var event : events) {
outputStream.write(("[pod-event] " + event.getReason() + ": " + event.getMessage() + "\n").getBytes());
}
outputStream.flush();
} catch (Exception e) {
runContext.logger().error("Failed to fetch pod events for '{}'", pod.getMetadata().getName(), e);
}
}

@Override
Expand Down
127 changes: 118 additions & 9 deletions src/main/java/io/kestra/plugin/kubernetes/services/PodService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.RetryUtils;
import io.kestra.plugin.kubernetes.models.Connection;
Expand All @@ -17,12 +18,15 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

abstract public class PodService {
private static final List<String> COMPLETED_PHASES = List.of("Succeeded", "Failed", "Unknown"); // see https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
private static final String SIDECAR_FILES_CONTAINER_NAME = "out-files";

public static KubernetesClient client(RunContext runContext, Connection connection) throws IllegalVariableEvaluationException {
return connection != null ? client(connection.toConfig(runContext)) : client(null);
Expand All @@ -48,18 +52,31 @@ public static Pod waitForInitContainerRunning(KubernetesClient client, Pod pod,
}

public static Pod waitForPodReady(KubernetesClient client, Pod pod, Duration waitUntilRunning) {
boolean hasSidecar = pod.getSpec().getContainers().stream()
.anyMatch(c -> SIDECAR_FILES_CONTAINER_NAME.equals(c.getName()));

return PodService.podRef(client, pod)
.waitUntilCondition(
j -> j != null &&
j.getStatus() != null && (
j.getStatus().getPhase().equals("Failed") ||
j.getStatus()
.getConditions()
.stream()
.anyMatch(podCondition -> podCondition.getType().equals("ContainersReady") ||
(podCondition.getReason() != null && podCondition.getReason().equals("PodCompleted"))
)
),
"Failed".equals(j.getStatus().getPhase()) ||
(j.getStatus().getContainerStatuses() != null &&
j.getStatus().getContainerStatuses().stream()
.anyMatch(cs -> cs.getState() != null &&
cs.getState().getWaiting() != null &&
cs.getState().getWaiting().getReason() != null &&
!TransientWaitingReason.contains(cs.getState().getWaiting().getReason())
)
) ||
j.getStatus()
.getConditions()
.stream()
.anyMatch(podCondition ->
("ContainersReady".equals(podCondition.getType()) &&
(hasSidecar || "True".equals(podCondition.getStatus()))) ||
("PodCompleted".equals(podCondition.getReason()))
)
),
waitUntilRunning.toSeconds(),
TimeUnit.SECONDS
);
Expand Down Expand Up @@ -176,7 +193,19 @@ public static IllegalStateException failedMessage(Pod pod) throws IllegalStateEx
"exitcode '" + containerStateTerminated.getExitCode() + "' & " +
"message '" + containerStateTerminated.getMessage() + "'"
))
.orElse(new IllegalStateException("Pods terminated without any containers status !"));
.orElseGet(() -> {
if (pod.getStatus().getContainerStatuses() != null) {
Optional<String> waitingReason = pod.getStatus().getContainerStatuses().stream()
.filter(cs -> cs.getState() != null && cs.getState().getWaiting() != null)
.map(cs -> cs.getState().getWaiting().getReason())
.filter(reason -> reason != null && !TransientWaitingReason.contains(reason))
.findFirst();
if (waitingReason.isPresent()) {
return new IllegalStateException("Pod failed before container start: " + waitingReason.get());
}
}
return new IllegalStateException("Pod failed with phase '" + pod.getStatus().getPhase() + "'");
});
}

public static void checkContainerFailures(Pod pod, String exceptContainer, Logger logger) throws IllegalStateException {
Expand Down Expand Up @@ -261,7 +290,87 @@ public static void uploadMarker(RunContext runContext, PodResource podResource,
logger.debug(marker + " marker uploaded");
}

/**
* Fetch and log Kubernetes pod events (e.g. FailedScheduling, Evicted, ImagePullBackOff).
*/
public static void logPodEvents(KubernetesClient client, Pod pod, Logger logger, AbstractLogConsumer logConsumer) {
if (pod == null || pod.getMetadata() == null) {
return;
}

String namespace = pod.getMetadata().getNamespace();
String podName = pod.getMetadata().getName();

try {
client.v1().events()
.inNamespace(namespace)
.withField("involvedObject.name", podName)
.list()
.getItems()
.stream()
.filter(event -> "Warning".equals(event.getType()))
.sorted(Comparator.comparing(
Event::getLastTimestamp,
Comparator.nullsLast(Comparator.naturalOrder())
))
.forEach(event -> {
String reason = event.getReason() == null ? "" : event.getReason();
String message = event.getMessage() == null ? "" : event.getMessage();

logConsumer.accept(
"[pod-event] " + reason + " - " + message,
true
);
});

} catch (Exception e) {
logger.warn("Failed to fetch events for pod '{}'", podName, e);
}
}

public static boolean hasAnyContainerStarted(Pod pod) {
if (pod.getStatus() == null || pod.getStatus().getContainerStatuses() == null) {
return false;
}
return pod.getStatus().getContainerStatuses().stream()
.anyMatch(cs -> cs.getState() != null &&
(cs.getState().getRunning() != null || cs.getState().getTerminated() != null)
);
}

public static boolean hasNonTransientWaitingContainer(Pod pod) {
if (pod.getStatus() == null || pod.getStatus().getContainerStatuses() == null) {
return false;
}
return pod.getStatus().getContainerStatuses().stream()
.anyMatch(cs -> cs.getState() != null &&
cs.getState().getWaiting() != null &&
cs.getState().getWaiting().getReason() != null &&
!TransientWaitingReason.contains(cs.getState().getWaiting().getReason())
);
}

public static Path tempDir(RunContext runContext) {
return runContext.workingDir().path().resolve("working-dir");
}

public enum TransientWaitingReason {
CONTAINER_CREATING("ContainerCreating"),
POD_INITIALIZING("PodInitializing");

private final String reason;

TransientWaitingReason(String reason) {
this.reason = reason;
}

public static boolean contains(String reason) {
for (TransientWaitingReason r : values()) {
if (r.reason.equals(reason)) {
return true;
}
}
return false;
}
}
}
Loading
Loading