Fix error where communication failures to k8s can lead to stuck tasks (#17431)

* Fix save logs error

* Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* make things final

* fix merge conflicts

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
George Shiqi Wu 2024-11-05 09:58:30 -08:00 committed by GitHub
parent 2eac8318f8
commit 8850023811
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 32 deletions

View File

@ -40,6 +40,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs; import org.apache.druid.tasklogs.TaskLogs;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -326,23 +327,20 @@ public class KubernetesPeonLifecycle
protected void saveLogs() protected void saveLogs()
{ {
try { try {
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); final Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
try { try {
startWatchingLogs(); final InputStream logStream;
if (logWatch != null) { if (logWatch != null) {
FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); logStream = logWatch.getOutput();
} else { } else {
log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); logStream = kubernetesClient.getPeonLogs(taskId).or(
FileUtils.writeStringToFile( new ByteArrayInputStream(StringUtils.format(
file.toFile(),
StringUtils.format(
"Peon for task [%s] did not report any logs. Check k8s metrics and events for the pod to see what happened.", "Peon for task [%s] did not report any logs. Check k8s metrics and events for the pod to see what happened.",
taskId taskId
), ).getBytes(StandardCharsets.UTF_8))
Charset.defaultCharset()
); );
} }
FileUtils.copyInputStreamToFile(logStream, file.toFile());
taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile()); taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile());
} }
catch (IOException e) { catch (IOException e) {

View File

@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
@ -266,7 +267,7 @@ public class KubernetesPeonClient
); );
} }
catch (Exception e) { catch (Exception e) {
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found"); throw DruidException.defensive(e, "Error when looking for K8s pod with label: job-name=%s", jobName);
} }
} }

View File

@ -46,8 +46,10 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -57,7 +59,9 @@ import java.util.concurrent.atomic.AtomicReference;
public class KubernetesPeonLifecycleTest extends EasyMockSupport public class KubernetesPeonLifecycleTest extends EasyMockSupport
{ {
private static final String ID = "id"; private static final String ID = "id";
private static final String IP = "ip";
private static final TaskStatus SUCCESS = TaskStatus.success(ID); private static final TaskStatus SUCCESS = TaskStatus.success(ID);
private static final InputStream LOG_INPUT_STREAM = new ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8));
@Mock KubernetesPeonClient kubernetesClient; @Mock KubernetesPeonClient kubernetesClient;
@Mock TaskLogs taskLogs; @Mock TaskLogs taskLogs;
@ -276,7 +280,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(null, PeonPhase.FAILED)); )).andReturn(new JobResponse(null, PeonPhase.FAILED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
@ -325,7 +329,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)
)); ));
@ -334,8 +338,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall(); EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -375,7 +377,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2);
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
); );
@ -383,14 +385,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expectLastCall(); EasyMock.expectLastCall();
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -435,7 +433,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -483,7 +481,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -491,9 +489,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
replayAll(); replayAll();
@ -533,7 +528,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.anyLong(), EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS) EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
); );
@ -543,8 +538,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@ -576,15 +569,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
)).andThrow(new RuntimeException()); )).andThrow(new RuntimeException());
// We should still try to push logs // We should still try to push logs
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall(); EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

View File

@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter;
@ -525,7 +526,7 @@ public class KubernetesPeonClientTest
void test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException() void test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException()
{ {
Assertions.assertThrows( Assertions.assertThrows(
KubernetesResourceNotFoundException.class, DruidException.class,
() -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1), () -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1),
StringUtils.format("K8s pod with label: job-name=%s not found", ID) StringUtils.format("K8s pod with label: job-name=%s not found", ID)
); );