From 8850023811b27eeb25095fa9d4ce96ede9081b52 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Tue, 5 Nov 2024 09:58:30 -0800 Subject: [PATCH] Fix error where communication failures to k8s can lead to stuck tasks (#17431) * Fix save logs error * Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java Co-authored-by: Kashif Faraz * make things final * fix merge conflicts --------- Co-authored-by: Kashif Faraz --- .../k8s/overlord/KubernetesPeonLifecycle.java | 18 +++++------ .../overlord/common/KubernetesPeonClient.java | 3 +- .../overlord/KubernetesPeonLifecycleTest.java | 31 +++++++------------ .../common/KubernetesPeonClientTest.java | 3 +- 4 files changed, 23 insertions(+), 32 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index eaef0cba6a1..59e1d0f88b3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -40,6 +40,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.tasklogs.TaskLogs; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -326,23 +327,20 @@ public class KubernetesPeonLifecycle protected void saveLogs() { try { - Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); + final Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); try { - startWatchingLogs(); + final InputStream logStream; if (logWatch != null) { - FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); + logStream = logWatch.getOutput(); } else { - log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); - FileUtils.writeStringToFile( - file.toFile(), - StringUtils.format( + logStream = kubernetesClient.getPeonLogs(taskId).or( + new ByteArrayInputStream(StringUtils.format( "Peon for task [%s] did not report any logs. Check k8s metrics and events for the pod to see what happened.", taskId - ), - Charset.defaultCharset() + ).getBytes(StandardCharsets.UTF_8)) ); - } + FileUtils.copyInputStreamToFile(logStream, file.toFile()); taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile()); } catch (IOException e) { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 63487e4e373..00cf93ba992 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.LogWatch; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.RetryUtils; @@ -266,7 +267,7 @@ public class KubernetesPeonClient ); } catch (Exception e) { - throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found"); + throw DruidException.defensive(e, "Error when looking for K8s pod with label: job-name=%s", jobName); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 59c3700b1fc..13ad4fda209 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -46,8 +46,10 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -57,7 +59,9 @@ import java.util.concurrent.atomic.AtomicReference; public class KubernetesPeonLifecycleTest extends EasyMockSupport { private static final String ID = "id"; + private static final String IP = "ip"; private static final TaskStatus SUCCESS = TaskStatus.success(ID); + private static final InputStream LOG_INPUT_STREAM = new ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8)); @Mock KubernetesPeonClient kubernetesClient; @Mock TaskLogs taskLogs; @@ -276,7 +280,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(null, PeonPhase.FAILED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); @@ -325,7 +329,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); @@ -334,8 +338,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); - EasyMock.expectLastCall().once(); - logWatch.close(); EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -375,7 +377,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -383,14 +385,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.expectLastCall(); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); - logWatch.close(); - EasyMock.expectLastCall(); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); - logWatch.close(); - EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -435,7 +433,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -483,7 +481,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -491,9 +489,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); - logWatch.close(); - EasyMock.expectLastCall(); - Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -533,7 +528,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -543,8 +538,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); - logWatch.close(); - EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -576,15 +569,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport )).andThrow(new RuntimeException()); // We should still try to push logs - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); - logWatch.close(); - EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index fa0da14fab7..42ec881dbc5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.metrics.StubServiceEmitter; @@ -525,7 +526,7 @@ public class KubernetesPeonClientTest void test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException() { Assertions.assertThrows( - KubernetesResourceNotFoundException.class, + DruidException.class, () -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1), StringUtils.format("K8s pod with label: job-name=%s not found", ID) );