diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java index ee7ae1b3292..660b39094ad 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java @@ -21,7 +21,10 @@ package org.apache.druid.k8s.overlord; import com.google.inject.Binder; import com.google.inject.Key; +import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Binders; import org.apache.druid.guice.IndexingServiceModuleHelper; @@ -34,14 +37,21 @@ import org.apache.druid.indexing.common.tasklogs.FileTaskLogs; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.tasklogs.NoopTaskLogs; import org.apache.druid.tasklogs.TaskLogKiller; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogs; + @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME) public class KubernetesOverlordModule implements DruidModule { + + private static final Logger log = new Logger(KubernetesOverlordModule.class); + @Override public void configure(Binder binder) { @@ -66,6 +76,41 @@ public class KubernetesOverlordModule implements DruidModule configureTaskLogs(binder); } + @Provides + @LazySingleton + public DruidKubernetesClient makeKubernetesClient(KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, Lifecycle lifecycle) + { + DruidKubernetesClient client; + if (kubernetesTaskRunnerConfig.isDisableClientProxy()) { + Config config = new ConfigBuilder().build(); + config.setHttpsProxy(null); + config.setHttpProxy(null); + client = new DruidKubernetesClient(config); + } else { + client = new DruidKubernetesClient(); + } + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() + { + + } + + @Override + public void stop() + { + log.info("Stopping overlord Kubernetes client"); + client.getClient().close(); + } + } + ); + + return client; + } + private void configureTaskLogs(Binder binder) { PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 9825469f678..f90f1204855 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.dsl.LogWatch; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.TaskLocation; @@ -37,6 +38,7 @@ import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.tasklogs.TaskLogs; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.io.IOException; import java.io.InputStream; @@ -78,6 +80,9 @@ public class KubernetesPeonLifecycle private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; + @MonotonicNonNull + private LogWatch logWatch; + protected KubernetesPeonLifecycle( Task task, KubernetesPeonClient kubernetesClient, @@ -151,16 +156,15 @@ public class KubernetesPeonLifecycle TimeUnit.MILLISECONDS ); - saveLogs(); - return getTaskStatus(jobResponse.getJobDuration()); } finally { try { + saveLogs(); shutdown(); } catch (Exception e) { - log.warn(e, "Task [%s] shutdown failed", taskId); + log.warn(e, "Task [%s] cleanup failed", taskId); } state.set(State.STOPPED); @@ -265,14 +269,31 @@ public class KubernetesPeonLifecycle return taskStatus.withDuration(duration); } - private void saveLogs() + protected void startWatchingLogs() + { + if (logWatch != null) { + log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId()); + return; + } + try { + Optional maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId); + if (maybeLogWatch.isPresent()) { + logWatch = maybeLogWatch.get(); + } + } + catch (Exception e) { + log.error(e, "Error watching logs from task: %s", taskId); + } + } + + protected void saveLogs() { try { Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); try { - Optional maybeLogStream = streamLogs(); - if (maybeLogStream.isPresent()) { - FileUtils.copyInputStreamToFile(maybeLogStream.get(), file.toFile()); + startWatchingLogs(); + if (logWatch != null) { + FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); } else { log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); } @@ -282,6 +303,9 @@ public class KubernetesPeonLifecycle log.error(e, "Failed to stream logs for task [%s]", taskId.getOriginalTaskId()); } finally { + if (logWatch != null) { + logWatch.close(); + } Files.deleteIfExists(file); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 8dbfe68a49a..5b63872cab7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -21,8 +21,6 @@ package org.apache.druid.k8s.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Self; @@ -55,6 +53,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException { - try (KubernetesClient client = getClient()) { - return executor.executeRequest(client); - } + return executor.executeRequest(kubernetesClient); } + /** This client automatically gets closed by the druid lifecycle, it should not be closed when used as it is + * meant to be reused. + * @return re-useable KubernetesClient + */ @Override public KubernetesClient getClient() { - return new KubernetesClientBuilder().withConfig(config).build(); + return this.kubernetesClient; } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 0b80515a148..1ed8eae128e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -120,26 +120,46 @@ public class KubernetesPeonClient } } - public Optional getPeonLogs(K8sTaskId taskId) + public Optional getPeonLogWatcher(K8sTaskId taskId) { KubernetesClient k8sClient = clientApi.getClient(); try { LogWatch logWatch = k8sClient.batch() + .v1() + .jobs() + .inNamespace(namespace) + .withName(taskId.getK8sTaskId()) + .inContainer("main") + .watchLog(); + if (logWatch == null) { + return Optional.absent(); + } + return Optional.of(logWatch); + } + catch (Exception e) { + log.error(e, "Error watching logs from task: %s", taskId); + return Optional.absent(); + } + } + + public Optional getPeonLogs(K8sTaskId taskId) + { + KubernetesClient k8sClient = clientApi.getClient(); + try { + InputStream logStream = k8sClient.batch() .v1() .jobs() .inNamespace(namespace) .withName(taskId.getK8sTaskId()) .inContainer("main") - .watchLog(); - if (logWatch == null) { - k8sClient.close(); + .getLogInputStream(); + if (logStream == null) { return Optional.absent(); } - return Optional.of(new LogWatchInputStream(k8sClient, logWatch)); + return Optional.of(logStream); } catch (Exception e) { log.error(e, "Error streaming logs from task: %s", taskId); - k8sClient.close(); return Optional.absent(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java deleted file mode 100644 index 3665e940f34..00000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/LogWatchInputStream.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.LogWatch; - -import java.io.IOException; -import java.io.InputStream; - -/** - * This wraps the InputStream for k8s client - * When you call close on the stream, it will also close the open - * http connections and the client - */ -public class LogWatchInputStream extends InputStream -{ - - private final KubernetesClient client; - private final LogWatch logWatch; - - public LogWatchInputStream(KubernetesClient client, LogWatch logWatch) - { - this.client = client; - this.logWatch = logWatch; - } - - @Override - public int read() throws IOException - { - return logWatch.getOutput().read(); - } - - @Override - public void close() - { - logWatch.close(); - client.close(); - } -} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 09ad1e10051..253e49c5205 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.client.dsl.LogWatch; import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -61,6 +62,8 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport @Mock KubernetesPeonClient kubernetesClient; @Mock TaskLogs taskLogs; + @Mock LogWatch logWatch; + private ObjectMapper mapper; private Task task; private K8sTaskId k8sTaskId; @@ -71,6 +74,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport mapper = new TestUtils().getTestObjectMapper(); task = NoopTask.create(ID, 0); k8sTaskId = new K8sTaskId(task); + EasyMock.expect(logWatch.getOutput()).andReturn(IOUtils.toInputStream("", StandardCharsets.UTF_8)).anyTimes(); } @Test @@ -184,12 +188,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(null, PeonPhase.FAILED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of( - IOUtils.toInputStream("", StandardCharsets.UTF_8) - )); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); replayAll(); @@ -225,14 +229,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of( - IOUtils.toInputStream("", StandardCharsets.UTF_8) - )); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -266,14 +270,18 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( - Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8)) - ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); + EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -313,12 +321,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( - Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8)) - ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -354,12 +362,12 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( - Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8)) - ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -395,14 +403,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn( - Optional.of(IOUtils.toInputStream("", StandardCharsets.UTF_8)) - ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall().andThrow(new IOException()); + logWatch.close(); + EasyMock.expectLastCall(); EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -417,6 +425,38 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } + + @Test + public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_throwsException() throws IOException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper); + + EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( + EasyMock.eq(k8sTaskId), + EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS) + )).andThrow(new RuntimeException()); + + // We should still try to push logs + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); + EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); + + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + + replayAll(); + + Assert.assertThrows(RuntimeException.class, () -> peonLifecycle.join(0L)); + + verifyAll(); + + Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); + } + @Test public void test_shutdown_withNotStartedTaskState() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java index c79ad753e35..9d211cffbaf 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter; @@ -48,6 +49,8 @@ public class KubernetesTaskRunnerFactoryTest private TaskConfig taskConfig; private Properties properties; + private DruidKubernetesClient druidKubernetesClient; + @Before public void setup() { @@ -68,6 +71,7 @@ public class KubernetesTaskRunnerFactoryTest ); taskConfig = new TaskConfigBuilder().setBaseDir("/tmp").build(); properties = new Properties(); + druidKubernetesClient = new DruidKubernetesClient(); } @Test @@ -81,7 +85,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - properties + properties, + druidKubernetesClient ); KubernetesTaskRunner expectedRunner = factory.build(); @@ -101,7 +106,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - properties + properties, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); @@ -126,7 +132,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - properties + properties, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); @@ -149,7 +156,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - props + props, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); @@ -177,7 +185,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - props + props, + druidKubernetesClient ); Assert.assertThrows( @@ -206,7 +215,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - props + props, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); @@ -229,7 +239,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - props + props, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); @@ -255,7 +266,8 @@ public class KubernetesTaskRunnerFactoryTest taskLogs, druidNode, taskConfig, - props + props, + druidKubernetesClient ); KubernetesTaskRunner runner = factory.build(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 1cca2fd89cb..b272230b079 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -23,15 +23,24 @@ import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; -public class KubernetesWorkItemTest +@RunWith(EasyMockRunner.class) +public class KubernetesWorkItemTest extends EasyMockSupport { private KubernetesWorkItem workItem; private Task task; + @Mock + KubernetesPeonLifecycle kubernetesPeonLifecycle; + @Before public void setup() { @@ -70,21 +79,16 @@ public class KubernetesWorkItemTest @Test public void test_shutdown_withKubernetesPeonLifecycle() { - KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( - task, - null, - null, - null - ) { - @Override - protected synchronized void shutdown() - { - } - }; + kubernetesPeonLifecycle.shutdown(); + EasyMock.expectLastCall(); + kubernetesPeonLifecycle.startWatchingLogs(); + EasyMock.expectLastCall(); - workItem.setKubernetesPeonLifecycle(peonLifecycle); + replayAll(); + workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); workItem.shutdown(); + verifyAll(); Assert.assertTrue(workItem.isShutdownRequested()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index 208773a3aaa..a8be9049824 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; +import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.apache.druid.java.util.common.StringUtils; @@ -494,4 +495,78 @@ public class KubernetesPeonClientTest StringUtils.format("K8s pod with label: job-name=%s not found", ID) ); } + + @Test + void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional() + { + server.expect().get() + .withPath("/apis/batch/v1/namespaces/namespace/jobs/id") + .andReturn(HttpURLConnection.HTTP_OK, new JobBuilder() + .withNewMetadata() + .withName(JOB_NAME) + .withUid("uid") + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewSpec() + .addNewContainer() + .withName("main") + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build() + ).once(); + + server.expect().get() + .withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid") + .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder() + .addNewItem() + .withNewMetadata() + .withName(POD_NAME) + .addNewOwnerReference() + .withUid("uid") + .withController(true) + .endOwnerReference() + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("main") + .endContainer() + .endSpec() + .endItem() + .build() + ).once(); + + server.expect().get() + .withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main") + .andReturn(HttpURLConnection.HTTP_OK, "data") + .once(); + + Optional maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID)); + Assertions.assertTrue(maybeLogWatch.isPresent()); + } + + + @Test + void test_getPeonLogsWatcher_withoutJob_returnsEmptyOptional() + { + Optional maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID)); + Assertions.assertFalse(maybeLogWatch.isPresent()); + } + + @Test + void test_getPeonLogWatcher_withJobWithoutPod_returnsEmptyOptional() + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(JOB_NAME) + .endMetadata() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + + Optional maybeLogWatch = instance.getPeonLogWatcher(new K8sTaskId(ID)); + Assertions.assertFalse(maybeLogWatch.isPresent()); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java deleted file mode 100644 index ffb60fda99f..00000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/LogWatchInputStreamTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.LogWatch; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.io.InputStream; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class LogWatchInputStreamTest -{ - - @Test - void testFlow() throws IOException - { - LogWatch logWatch = mock(LogWatch.class); - InputStream inputStream = mock(InputStream.class); - when(inputStream.read()).thenReturn(1); - when(logWatch.getOutput()).thenReturn(inputStream); - KubernetesClient client = mock(KubernetesClient.class); - LogWatchInputStream stream = new LogWatchInputStream(client, logWatch); - int result = stream.read(); - Assertions.assertEquals(1, result); - verify(inputStream, times(1)).read(); - stream.close(); - verify(logWatch, times(1)).close(); - verify(client, times(1)).close(); - } -}