diff --git a/.github/setup-unit.sh b/.github/setup-unit.sh index 29cfd894..47c91b1e 100755 --- a/.github/setup-unit.sh +++ b/.github/setup-unit.sh @@ -1,5 +1,5 @@ -curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64" +curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.31.0/kind-linux-amd64" chmod +x ./kind ./kind create cluster curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl" -chmod +x ./kubectl +chmod +x ./kubectl \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/kubernetes/core/PodCreate.java b/src/main/java/io/kestra/plugin/kubernetes/core/PodCreate.java index 3c21a0fa..db2c9de1 100644 --- a/src/main/java/io/kestra/plugin/kubernetes/core/PodCreate.java +++ b/src/main/java/io/kestra/plugin/kubernetes/core/PodCreate.java @@ -383,7 +383,7 @@ public PodCreate.Output run(RunContext runContext) throws Exception { try (Watch ignored = PodService.podRef(client, pod).watch(listOptions(runContext), new PodWatcher(logger))) { try { // in case of resuming an already running pod, the status will be running - if (!"Running".equals(pod.getStatus().getPhase())) { + if (!PodService.PodPhase.RUNNING.value().equals(pod.getStatus().getPhase())) { // wait for init container and upload files if (validatedInputFiles != null) { // Files already validated and created before pod creation @@ -396,21 +396,28 @@ public PodCreate.Output run(RunContext runContext) throws Exception { pod = PodService.waitForPodReady(client, pod, rWaitUntilRunning); } - if (pod.getStatus() != null && pod.getStatus().getPhase().equals("Failed")) { - throw PodService.failedMessage(pod); + // Set up log consumer for output parsing (used by both watch and fetchFinalLogs) + AbstractLogConsumer logConsumer = new DefaultLogConsumer(runContext); + podLogService.setLogConsumer(logConsumer); + + if (pod.getStatus() != null) { + if (PodService.PodPhase.FAILED.value().equals(pod.getStatus().getPhase()) || PodService.hasNonTransientWaitingContainer(pod)) { + if (PodService.hasAnyContainerStarted(pod)) { + podLogService.fetchFinalLogs(client, pod, runContext); + } + PodService.logPodEvents(client, pod, logger, logConsumer); + throw PodService.failedMessage(pod); + } } // Wait for containers to start (Running) or pod to reach terminal state (Succeeded/Failed/Unknown) // This ensures we proceed with log collection regardless of pod outcome pod = PodService.waitForContainersStartedOrCompleted(client, pod, rWaitUntilRunning); - // Set up log consumer for output parsing (used by both watch and fetchFinalLogs) - AbstractLogConsumer logConsumer = new DefaultLogConsumer(runContext); - podLogService.setLogConsumer(logConsumer); // Only start log streaming if pod is actually running // For pods that complete quickly, fetchFinalLogs will handle log collection - if (pod.getStatus() != null && "Running".equals(pod.getStatus().getPhase())) { + if (pod.getStatus() != null && PodService.PodPhase.RUNNING.value().equals(pod.getStatus().getPhase())) { podLogService.watch(client, pod, logConsumer, runContext); } @@ -425,7 +432,7 @@ public PodCreate.Output run(RunContext runContext) throws Exception { } // Collect late logs and check for failures (throws if container failed) - handleEnd(ended, runContext, this.outputFiles != null, client, podLogService); + handleEnd(ended, runContext, this.outputFiles != null, client, podLogService, logConsumer); PodStatus podStatus = PodStatus.from(ended.getStatus()); Output.OutputBuilder output = Output.builder() @@ -630,7 +637,7 @@ private void delete(KubernetesClient client, Logger logger, Pod pod, RunContext } } - private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, KubernetesClient client, PodLogService podLogService) throws Exception { + private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, KubernetesClient client, PodLogService podLogService, AbstractLogConsumer logConsumer) throws Exception { Logger logger = runContext.logger(); // Wait for async log stream (watchLog) to finish processing @@ -641,6 +648,10 @@ private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, Pod currentPod = PodService.podRef(client, ended).get(); if (currentPod != null) { podLogService.fetchFinalLogs(client, ended, runContext); + + if (!PodService.PodPhase.SUCCEEDED.value().equals(ended.getStatus().getPhase())) { + PodService.logPodEvents(client, ended, logger, logConsumer); + } } else { logger.debug("Pod '{}' was already deleted, skipping fetchFinalLogs", ended.getMetadata().getName()); } @@ -650,7 +661,7 @@ private void handleEnd(Pod ended, RunContext runContext, boolean hasOutputFiles, // For pods with outputFiles, check container exit codes // (pod phase stays "Running" due to sidecar container) PodService.checkContainerFailures(ended, SIDECAR_FILES_CONTAINER_NAME, logger); - } else if (ended.getStatus() != null && ended.getStatus().getPhase().equals("Failed")) { + } else if (ended.getStatus() != null && ended.getStatus().getPhase().equals(PodService.PodPhase.FAILED.value())) { // For pods without outputFiles, check pod phase throw PodService.failedMessage(ended); } diff --git a/src/main/java/io/kestra/plugin/kubernetes/services/PodLogService.java b/src/main/java/io/kestra/plugin/kubernetes/services/PodLogService.java index 13cf4a88..87b8868b 100644 --- a/src/main/java/io/kestra/plugin/kubernetes/services/PodLogService.java +++ b/src/main/java/io/kestra/plugin/kubernetes/services/PodLogService.java @@ -142,7 +142,7 @@ public void fetchFinalLogs(KubernetesClient client, Pod pod, RunContext runConte PodResource podResource = PodService.podRef(client, pod); - pod.getSpec().getContainers().forEach(container -> { + if (pod.getSpec().getContainers().stream().noneMatch(container -> { try { String logs = podResource .inContainer(container.getName()) @@ -151,16 +151,45 @@ public void fetchFinalLogs(KubernetesClient client, Pod pod, RunContext runConte .getLog(); if (logs != null && !logs.isEmpty()) { - // Write all logs - hash-based deduplication automatically filters duplicates outputStream.write(logs.getBytes()); outputStream.flush(); - } else { - logger.debug("No logs returned for container '{}'", container.getName()); + return true; } + logger.debug("No logs returned for container '{}'", container.getName()); } catch (IOException e) { logger.error("Failed to fetch final logs for container '{}'", container.getName(), e); } - }); + return false; + })) { + // if no container logs were found, the pod likely never started. + // we fall back to Kubernetes pod events + fetchPodEvents(client, pod, runContext); + } + } + + private void fetchPodEvents(KubernetesClient client, Pod pod, RunContext runContext) { + Logger logger = runContext.logger(); + + try { + var events = client.v1().events() + .inNamespace(pod.getMetadata().getNamespace()) + .withField("involvedObject.name", pod.getMetadata().getName()) + .list() + .getItems(); + + if (events.isEmpty()) { + logger.warn("No container logs and no pod events found for pod '{}'", pod.getMetadata().getName()); + return; + } + + logger.info("No container logs available. Pod events:"); + for (var event : events) { + outputStream.write(("[pod-event] " + event.getReason() + ": " + event.getMessage() + "\n").getBytes()); + } + outputStream.flush(); + } catch (Exception e) { + logger.error("Failed to fetch pod events for '{}'", pod.getMetadata().getName(), e); + } } @Override diff --git a/src/main/java/io/kestra/plugin/kubernetes/services/PodService.java b/src/main/java/io/kestra/plugin/kubernetes/services/PodService.java index 30fc6a26..e1e6afc5 100644 --- a/src/main/java/io/kestra/plugin/kubernetes/services/PodService.java +++ b/src/main/java/io/kestra/plugin/kubernetes/services/PodService.java @@ -7,6 +7,7 @@ import io.fabric8.kubernetes.client.dsl.PodResource; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.tasks.retrys.Exponential; +import io.kestra.core.models.tasks.runners.AbstractLogConsumer; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.RetryUtils; import io.kestra.plugin.kubernetes.models.Connection; @@ -17,12 +18,15 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; abstract public class PodService { private static final List COMPLETED_PHASES = List.of("Succeeded", "Failed", "Unknown"); // see https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase + private static final String SIDECAR_FILES_CONTAINER_NAME = "out-files"; public static KubernetesClient client(RunContext runContext, Connection connection) throws IllegalVariableEvaluationException { return connection != null ? client(connection.toConfig(runContext)) : client(null); @@ -48,18 +52,31 @@ public static Pod waitForInitContainerRunning(KubernetesClient client, Pod pod, } public static Pod waitForPodReady(KubernetesClient client, Pod pod, Duration waitUntilRunning) { + boolean hasSidecar = pod.getSpec().getContainers().stream() + .anyMatch(c -> SIDECAR_FILES_CONTAINER_NAME.equals(c.getName())); + return PodService.podRef(client, pod) .waitUntilCondition( j -> j != null && j.getStatus() != null && ( - j.getStatus().getPhase().equals("Failed") || - j.getStatus() - .getConditions() - .stream() - .anyMatch(podCondition -> podCondition.getType().equals("ContainersReady") || - (podCondition.getReason() != null && podCondition.getReason().equals("PodCompleted")) - ) - ), + PodPhase.FAILED.value().equals(j.getStatus().getPhase()) || + (j.getStatus().getContainerStatuses() != null && + j.getStatus().getContainerStatuses().stream() + .anyMatch(cs -> cs.getState() != null && + cs.getState().getWaiting() != null && + cs.getState().getWaiting().getReason() != null && + !TransientWaitingReason.contains(cs.getState().getWaiting().getReason()) + ) + ) || + j.getStatus() + .getConditions() + .stream() + .anyMatch(podCondition -> + ("ContainersReady".equals(podCondition.getType()) && + (hasSidecar || "True".equals(podCondition.getStatus()))) || + ("PodCompleted".equals(podCondition.getReason())) + ) + ), waitUntilRunning.toSeconds(), TimeUnit.SECONDS ); @@ -70,7 +87,7 @@ public static Pod waitForContainersStartedOrCompleted(KubernetesClient client, P .waitUntilCondition( j -> j != null && j.getStatus() != null && ( - ("Running".equals(j.getStatus().getPhase()) && + (PodPhase.RUNNING.value().equals(j.getStatus().getPhase()) && j.getStatus().getContainerStatuses() != null && j.getStatus().getContainerStatuses().stream() .anyMatch(c -> c.getState().getRunning() != null)) @@ -176,7 +193,19 @@ public static IllegalStateException failedMessage(Pod pod) throws IllegalStateEx "exitcode '" + containerStateTerminated.getExitCode() + "' & " + "message '" + containerStateTerminated.getMessage() + "'" )) - .orElse(new IllegalStateException("Pods terminated without any containers status !")); + .orElseGet(() -> { + if (pod.getStatus().getContainerStatuses() != null) { + Optional waitingReason = pod.getStatus().getContainerStatuses().stream() + .filter(cs -> cs.getState() != null && cs.getState().getWaiting() != null) + .map(cs -> cs.getState().getWaiting().getReason()) + .filter(reason -> reason != null && !TransientWaitingReason.contains(reason)) + .findFirst(); + if (waitingReason.isPresent()) { + return new IllegalStateException("Pod failed before container start: " + waitingReason.get()); + } + } + return new IllegalStateException("Pod failed with phase '" + pod.getStatus().getPhase() + "'"); + }); } public static void checkContainerFailures(Pod pod, String exceptContainer, Logger logger) throws IllegalStateException { @@ -261,7 +290,101 @@ public static void uploadMarker(RunContext runContext, PodResource podResource, logger.debug(marker + " marker uploaded"); } + /** + * Fetch and log Kubernetes pod events (e.g. FailedScheduling, Evicted, ImagePullBackOff). + */ + public static void logPodEvents(KubernetesClient client, Pod pod, Logger logger, AbstractLogConsumer logConsumer) { + if (pod == null || pod.getMetadata() == null) { + return; + } + + String namespace = pod.getMetadata().getNamespace(); + String podName = pod.getMetadata().getName(); + + try { + client.v1().events() + .inNamespace(namespace) + .withField("involvedObject.name", podName) + .list() + .getItems() + .stream() + .filter(event -> "Warning".equals(event.getType())) + .sorted(Comparator.comparing( + Event::getLastTimestamp, + Comparator.nullsLast(Comparator.naturalOrder()) + )) + .forEach(event -> { + String reason = event.getReason() == null ? "" : event.getReason(); + String message = event.getMessage() == null ? "" : event.getMessage(); + + logConsumer.accept( + "[pod-event] " + reason + " - " + message, + true + ); + }); + + } catch (Exception e) { + logger.warn("Failed to fetch events for pod '{}'", podName, e); + } + } + + public static boolean hasAnyContainerStarted(Pod pod) { + if (pod.getStatus() == null || pod.getStatus().getContainerStatuses() == null) { + return false; + } + return pod.getStatus().getContainerStatuses().stream() + .anyMatch(cs -> cs.getState() != null && + (cs.getState().getRunning() != null || cs.getState().getTerminated() != null) + ); + } + + public static boolean hasNonTransientWaitingContainer(Pod pod) { + if (pod.getStatus() == null || pod.getStatus().getContainerStatuses() == null) { + return false; + } + return pod.getStatus().getContainerStatuses().stream() + .anyMatch(cs -> cs.getState() != null && + cs.getState().getWaiting() != null && + cs.getState().getWaiting().getReason() != null && + !TransientWaitingReason.contains(cs.getState().getWaiting().getReason()) + ); + } + public static Path tempDir(RunContext runContext) { return runContext.workingDir().path().resolve("working-dir"); } + + public enum TransientWaitingReason { + CONTAINER_CREATING("ContainerCreating"), + POD_INITIALIZING("PodInitializing"); + + private final String reason; + + TransientWaitingReason(String reason) { + this.reason = reason; + } + + public static boolean contains(String reason) { + for (TransientWaitingReason r : values()) { + if (r.reason.equals(reason)) { + return true; + } + } + return false; + } + } + + public enum PodPhase { + PENDING("Pending"), + RUNNING("Running"), + SUCCEEDED("Succeeded"), + FAILED("Failed"), + UNKNOWN("Unknown"); + + private final String value; + + PodPhase(String value) { this.value = value; } + + public String value() { return value; } + } } diff --git a/src/test/java/io/kestra/plugin/kubernetes/core/PodCreateTest.java b/src/test/java/io/kestra/plugin/kubernetes/core/PodCreateTest.java index 22f38c26..5fa25e0c 100644 --- a/src/test/java/io/kestra/plugin/kubernetes/core/PodCreateTest.java +++ b/src/test/java/io/kestra/plugin/kubernetes/core/PodCreateTest.java @@ -216,7 +216,7 @@ void failedWithOutputFiles() throws Exception { " command: ", " - 'bash' ", " - '-c'", - " - 'echo \"Container failing\" && exit 1'", + " - 'echo \"Container failing\" && sleep 1 && exit 1'", "restartPolicy: Never" )) .build(); @@ -640,7 +640,7 @@ void sidecarResources() throws Exception { ObjectMeta.class, "containers:", "- name: in-out-files", - " image: debian:stable-slim", + " image: busybox", " command: [\"/bin/sh\"]", " args:", " - -c", @@ -672,7 +672,12 @@ void sidecarResources() throws Exception { try (KubernetesClient client = PodService.client(finalRunContext, null)) { Await.until(() -> { var pods = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems(); - return !pods.isEmpty() && pods.getFirst().getStatus().getPhase().equals("Running"); + if (pods.isEmpty()) { + return false; + } + + String phase = pods.getFirst().getStatus().getPhase(); + return "Running".equals(phase) || "Succeeded".equals(phase); }, Duration.ofMillis(200), Duration.ofMinutes(1)); var createdPod = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems().getFirst(); @@ -864,7 +869,7 @@ void successWithOutputFiles() throws Exception { " command: ", " - 'bash' ", " - '-c'", - " - 'echo \"Task succeeded\" > {{ workingDir }}/result.txt && exit 0'", + " - 'echo \"Task succeeded\" > {{ workingDir }}/result.txt && sleep 1 && exit 0'", "restartPolicy: Never" )) .build(); @@ -900,13 +905,13 @@ void multipleContainersOneFailsWithOutputFiles() throws Exception { " command: ", " - 'bash' ", " - '-c'", - " - 'echo \"First container succeeded\" && exit 0'", + " - 'echo \"First container succeeded\" && sleep 1 && exit 0'", "- name: container-failure", " image: debian:stable-slim", " command: ", " - 'bash' ", " - '-c'", - " - 'echo \"Second container failing\" && exit 1'", + " - 'echo \"Second container failing\" && sleep 1 && exit 1'", "restartPolicy: Never" )) .build(); @@ -947,7 +952,7 @@ void completeLogCollectionAfterQuickTermination() throws Exception { " command:", " - 'bash'", " - '-c'", - " - 'seq 1 20 | while read i; do echo \"Quick termination log line $i\"; done; echo \"FINAL\" && exit 1'", + " - 'seq 1 20 | while read i; do echo \"Quick termination log line $i\"; done; echo \"FINAL\" && sleep 1 && exit 1'", "restartPolicy: Never" )) .build(); @@ -1244,7 +1249,12 @@ void containerDefaultSpecSecurityContext() throws Exception { // Wait for pod to be running Await.until(() -> { var pods = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems(); - return !pods.isEmpty() && pods.getFirst().getStatus().getPhase().equals("Running"); + if (pods.isEmpty()) { + return false; + } + + String phase = pods.getFirst().getStatus().getPhase(); + return "Running".equals(phase) || "Succeeded".equals(phase); }, Duration.ofMillis(200), Duration.ofMinutes(1)); var createdPod = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems().getFirst(); @@ -1345,7 +1355,12 @@ void containerDefaultSpecVolumeMounts() throws Exception { // Wait for pod to be running Await.until(() -> { var pods = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems(); - return !pods.isEmpty() && pods.getFirst().getStatus().getPhase().equals("Running"); + if (pods.isEmpty()) { + return false; + } + + String phase = pods.getFirst().getStatus().getPhase(); + return "Running".equals(phase) || "Succeeded".equals(phase); }, Duration.ofMillis(200), Duration.ofMinutes(1)); var createdPod = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems().getFirst(); @@ -1507,7 +1522,7 @@ void containerDefaultSpecMultipleFields() throws Exception { " emptyDir: {}", "containers:", "- name: main", - " image: debian:stable-slim", + " image: busybox", " command: [\"/bin/sh\"]", " args:", " - -c", @@ -1540,7 +1555,12 @@ void containerDefaultSpecMultipleFields() throws Exception { // Wait for pod to be running Await.until(() -> { var pods = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems(); - return !pods.isEmpty() && pods.getFirst().getStatus().getPhase().equals("Running"); + if (pods.isEmpty()) { + return false; + } + + String phase = pods.getFirst().getStatus().getPhase(); + return "Running".equals(phase) || "Succeeded".equals(phase); }, Duration.ofMillis(200), Duration.ofMinutes(1)); var createdPod = client.pods().inNamespace("default").withLabelSelector(labelSelector).list().getItems().getFirst(); diff --git a/src/test/java/io/kestra/plugin/kubernetes/services/PodServiceTest.java b/src/test/java/io/kestra/plugin/kubernetes/services/PodServiceTest.java new file mode 100644 index 00000000..d995c04e --- /dev/null +++ b/src/test/java/io/kestra/plugin/kubernetes/services/PodServiceTest.java @@ -0,0 +1,69 @@ +package io.kestra.plugin.kubernetes.services; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.kestra.core.junit.annotations.KestraTest; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.function.Predicate; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@KestraTest +class PodServiceTest { + + @Test + void shouldNotReturnPodWhenContainersReadyFalse() { + KubernetesClient client = mock(KubernetesClient.class); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName("test-pod") + .withNamespace("default") + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("test-container") + .withImage("busybox") + .endContainer() + .endSpec() + .withNewStatus() + .addNewCondition() + .withType("ContainersReady") + .withStatus("False") + .endCondition() + .endStatus() + .build(); + + @SuppressWarnings("rawtypes") + MixedOperation pods = mock(MixedOperation.class); + + @SuppressWarnings("rawtypes") + NonNamespaceOperation ns = mock(NonNamespaceOperation.class); + + PodResource podResource = mock(PodResource.class); + + when(client.pods()).thenReturn(pods); + when(pods.inNamespace("default")).thenReturn(ns); + when(ns.withName("test-pod")).thenReturn(podResource); + + when(podResource.waitUntilCondition(any(), anyLong(), any())) + .thenAnswer(invocation -> { + Predicate predicate = invocation.getArgument(0); + return predicate.test(pod) ? pod : null; + }); + + Pod result = PodService.waitForPodReady(client, pod, Duration.ofSeconds(1)); + + assertNull(result); + } +}