From 5764183d4e3c3d8267e98eced05858d5524816c9 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 8 Nov 2024 08:13:35 -0800 Subject: [PATCH] k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state before returning from KubernetesTaskRunner.start (#17446) * Add a wait on start() for task lifecycle to go into running * handle exceptions * Fix logging messages * Don't pass in the settable future as a arg * add some unit tests --- .../extensions-contrib/k8s-jobs.md | 1 + .../k8s/overlord/KubernetesPeonLifecycle.java | 41 +++++- .../k8s/overlord/KubernetesTaskRunner.java | 79 +++++++++--- .../overlord/KubernetesTaskRunnerConfig.java | 28 +++- .../k8s/overlord/KubernetesWorkItem.java | 28 ++-- .../overlord/KubernetesPeonLifecycleTest.java | 51 +++++++- .../overlord/KubernetesTaskRunnerTest.java | 122 +++++++++++++----- .../k8s/overlord/KubernetesWorkItemTest.java | 93 +++++-------- 8 files changed, 311 insertions(+), 132 deletions(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 913e40b9373..11663642bdd 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`. |`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No| |`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No| |`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No| +|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No| |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No| |`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No| |`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 59e1d0f88b3..4dffaf5d0fc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -90,6 +92,8 @@ public class KubernetesPeonLifecycle private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + private final SettableFuture taskStartedSuccessfullyFuture; + @MonotonicNonNull private LogWatch logWatch; @@ -109,6 +113,7 @@ public class KubernetesPeonLifecycle this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; + this.taskStartedSuccessfullyFuture = SettableFuture.create(); } /** @@ -137,11 +142,13 @@ public class KubernetesPeonLifecycle launchTimeout, TimeUnit.MILLISECONDS ); - return join(timeout); } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); + } throw e; } finally { @@ -179,7 +186,7 @@ public class KubernetesPeonLifecycle { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - + taskStartedSuccessfullyFuture.set(true); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -188,6 +195,12 @@ public class KubernetesPeonLifecycle return getTaskStatus(jobResponse.getJobDuration()); } + catch (Exception e) { + if (!taskStartedSuccessfullyFuture.isDone()) { + taskStartedSuccessfullyFuture.set(false); + } + throw e; + } finally { try { saveLogs(); @@ -195,7 +208,6 @@ public class KubernetesPeonLifecycle catch (Exception e) { log.warn(e, "Log processing failed for task [%s]", taskId); } - stopTask(); } } @@ -246,7 +258,10 @@ public class KubernetesPeonLifecycle protected TaskLocation getTaskLocation() { if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) { - log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); + /* This should not actually ever happen because KubernetesTaskRunner.start() should not return until all running tasks + have already gone into State.RUNNING, so getTaskLocation should not be called. + */ + log.warn("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); return TaskLocation.unknown(); } @@ -257,6 +272,10 @@ public class KubernetesPeonLifecycle if (taskLocation == null) { Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); if (!maybePod.isPresent()) { + /* Arguably we should throw a exception here but leaving it as a warn log to prevent unexpected errors. + If there is strange behavior during overlord restarts the operator should look for this warn log. + */ + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } @@ -264,6 +283,7 @@ public class KubernetesPeonLifecycle PodStatus podStatus = pod.getStatus(); if (podStatus == null || podStatus.getPodIP() == null) { + log.warn("Could not get task location from k8s for task [%s].", taskId); return TaskLocation.unknown(); } taskLocation = TaskLocation.create( @@ -376,4 +396,17 @@ public class KubernetesPeonLifecycle ); stateListener.stateChanged(state.get(), taskId.getOriginalTaskId()); } + + /** + * Retrieves the current {@link ListenableFuture} representing whether the task started successfully + * + *

This future can be used to track whether the task started successfully, with a boolean result + * indicating success (true) or failure (false) when the task starts. + * + * @return a {@link ListenableFuture} representing whether the task started successfully. + */ + protected ListenableFuture getTaskStartedSuccessfullyFuture() + { + return taskStartedSuccessfullyFuture; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c324b49e13a..deb1f0b3d9c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -55,12 +56,14 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -146,16 +149,28 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> runTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )).getResult(); } } - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( + task, + exec.submit(() -> joinTask(task)), + peonLifecycleFactory.build( + task, + this::emitTaskStateMetrics + ) + )); } } @@ -173,10 +188,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner protected TaskStatus doTask(Task task, boolean run) { try { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( - task, - this::emitTaskStateMetrics - ); + KubernetesPeonLifecycle peonLifecycle; synchronized (tasks) { KubernetesWorkItem workItem = tasks.get(task.getId()); @@ -185,7 +197,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner throw new ISE("Task [%s] has been shut down", task.getId()); } - workItem.setKubernetesPeonLifecycle(peonLifecycle); + peonLifecycle = workItem.getPeonLifeycle(); } TaskStatus taskStatus; @@ -321,16 +333,53 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner public void start() { log.info("Starting K8sTaskRunner..."); - // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. - for (Job job : client.getPeonJobs()) { + // Load tasks from previously running jobs and wait for their statuses to start running. + final List> taskStatusActiveList = new ArrayList<>(); + final List peonJobs = client.getPeonJobs(); + + log.info("Locating [%,d] active tasks.", peonJobs.size()); + for (Job job : peonJobs) { try { - joinAsync(adapter.toTask(job)); + KubernetesWorkItem kubernetesWorkItem = joinAsync(adapter.toTask(job)); + taskStatusActiveList.add(kubernetesWorkItem.getPeonLifeycle().getTaskStartedSuccessfullyFuture()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - log.info("Loaded %,d tasks from previous run", tasks.size()); + + try { + final DateTime nowUtc = DateTimes.nowUtc(); + final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis(); + if (timeoutMs > 0) { + FutureUtils.coalesce(taskStatusActiveList).get(timeoutMs, TimeUnit.MILLISECONDS); + } + log.info("Located [%,d] active tasks.", taskStatusActiveList.size()); + } + catch (Exception e) { + final long numInitialized = + tasks.values() + .stream() + .filter(item -> { + if (item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()) { + try { + return item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().get(); + } + catch (InterruptedException | ExecutionException ex) { + return false; + } + } else { + return false; + } + }).count(); + log.warn( + e, + "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", + numInitialized, + taskStatusActiveList.size(), + config.getTaskJoinTimeout() + ); + } cleanupExecutor.scheduleAtFixedRate( () -> @@ -342,7 +391,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner config.getTaskCleanupInterval().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS ); - log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); + log.info("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 60efa3c4856..106378f57aa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -85,6 +85,12 @@ public class KubernetesTaskRunnerConfig // interval for k8s job cleanup to run private Period taskCleanupInterval = new Period("PT10m"); + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + + @JsonProperty @NotNull // how long to wait for the peon k8s job to launch @@ -140,7 +146,8 @@ public class KubernetesTaskRunnerConfig int cpuCoreInMicro, Map labels, Map annotations, - Integer capacity + Integer capacity, + Period taskJoinTimeout ) { this.namespace = namespace; @@ -181,6 +188,10 @@ public class KubernetesTaskRunnerConfig k8sjobLaunchTimeout, this.k8sjobLaunchTimeout ); + this.taskJoinTimeout = ObjectUtils.defaultIfNull( + taskJoinTimeout, + this.taskJoinTimeout + ); this.peonMonitors = ObjectUtils.defaultIfNull( peonMonitors, this.peonMonitors @@ -247,6 +258,11 @@ public class KubernetesTaskRunnerConfig { return maxTaskDuration; } + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + public Period getTaskCleanupDelay() { @@ -317,6 +333,7 @@ public class KubernetesTaskRunnerConfig private Map labels; private Map annotations; private Integer capacity; + private Period taskJoinTimeout; public Builder() { @@ -425,6 +442,12 @@ public class KubernetesTaskRunnerConfig return this; } + public Builder withTaskJoinTimeout(Period taskJoinTimeout) + { + this.taskJoinTimeout = taskJoinTimeout; + return this; + } + public KubernetesTaskRunnerConfig build() { return new KubernetesTaskRunnerConfig( @@ -444,7 +467,8 @@ public class KubernetesTaskRunnerConfig this.cpuCoreInMicro, this.labels, this.annotations, - this.capacity + this.capacity, + this.taskJoinTimeout ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f6..5eb55b097b4 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -20,7 +20,6 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -34,27 +33,19 @@ import java.io.InputStream; public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private final KubernetesPeonLifecycle kubernetesPeonLifecycle; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { super(task.getId(), statusFuture); this.task = task; - } - - protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) - { - Preconditions.checkState(this.kubernetesPeonLifecycle == null); this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; } protected synchronized void shutdown() { - - if (this.kubernetesPeonLifecycle != null) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); - } + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); } protected boolean isPending() @@ -88,18 +79,12 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem protected Optional streamTaskLogs() { - if (kubernetesPeonLifecycle == null) { - return Optional.absent(); - } return kubernetesPeonLifecycle.streamLogs(); } @Override public TaskLocation getLocation() { - if (kubernetesPeonLifecycle == null) { - return TaskLocation.unknown(); - } return kubernetesPeonLifecycle.getTaskLocation(); } @@ -119,4 +104,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { return task; } + + protected KubernetesPeonLifecycle getPeonLifeycle() + { + return this.kubernetesPeonLifecycle; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 13ad4fda209..85ea7072b85 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -264,6 +265,53 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } + @Test + public void test_run_whenExceptionRaised_setsStartStatusFutureToFalse() throws ExecutionException, InterruptedException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ) + { + @Override + protected synchronized TaskStatus join(long timeout) + { + throw new IllegalStateException(); + } + }; + + Job job = new JobBuilder().withNewMetadata().withName(ID).endMetadata().build(); + + EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart( + EasyMock.eq(job), + EasyMock.eq(task), + EasyMock.anyLong(), + EasyMock.eq(TimeUnit.MILLISECONDS) + )).andReturn(null); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID); + EasyMock.expectLastCall().once(); + stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); + EasyMock.expectLastCall().once(); + + replayAll(); + + Assert.assertThrows( + Exception.class, + () -> peonLifecycle.run(job, 0L, 0L, false) + ); + + verifyAll(); + + Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().get()); + + } + @Test public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException { @@ -313,6 +361,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport stateListener ); + Assert.assertFalse(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Job job = new JobBuilder() .withNewMetadata() .withName(ID) @@ -347,7 +396,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport TaskStatus taskStatus = peonLifecycle.join(0L); verifyAll(); - + Assert.assertTrue(peonLifecycle.getTaskStartedSuccessfullyFuture().isDone()); Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus); Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 67a5278c6a3..a6c01ee306a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.commons.io.IOUtils; @@ -103,6 +104,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_start_withExistingJobs() throws IOException { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -113,15 +116,16 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), k -> new KubernetesWorkItem( task, - Futures.immediateFuture(TaskStatus.success(task.getId())) + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle ) - ).getResult(); + ); } }; @@ -133,6 +137,67 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(1, runner.tasks.size()); + } + + @Test + public void test_start_withExistingJobs_oneJobFails() throws IOException + { + SettableFuture settableFuture = SettableFuture.create(); + settableFuture.set(true); + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected KubernetesWorkItem joinAsync(Task task) + { + return tasks.computeIfAbsent( + task.getId(), + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())), + kubernetesPeonLifecycle + ) + ); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + Job job2 = new JobBuilder() + .withNewMetadata() + .withName("id2") + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job, job2)); + EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.toTask(job2)).andThrow(new IOException("deserialization exception")); + + EasyMock.expect(kubernetesPeonLifecycle.getTaskStartedSuccessfullyFuture()).andReturn( + settableFuture + ); replayAll(); @@ -157,10 +222,9 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null, kubernetesPeonLifecycle)); } }; @@ -193,7 +257,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_streamTaskLog_withExistingTask() throws IOException { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected Optional streamTaskLogs() @@ -241,7 +305,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_run_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -286,8 +350,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport replayAll(); - ListenableFuture future = runner.joinAsync(task); - Assert.assertEquals(taskStatus, future.get()); + KubernetesWorkItem workItem = runner.joinAsync(task); + Assert.assertEquals(taskStatus, workItem.getResult().get()); verifyAll(); } @@ -295,7 +359,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_join_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -310,9 +374,9 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport replayAll(); - ListenableFuture future = runner.joinAsync(task); + KubernetesWorkItem workItem = runner.joinAsync(task); - Exception e = Assert.assertThrows(ExecutionException.class, future::get); + Exception e = Assert.assertThrows(ExecutionException.class, () -> workItem.getResult().get()); Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); @@ -331,7 +395,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_shutdown_withExistingTask_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -348,7 +412,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture, kubernetesPeonLifecycle) { @Override protected synchronized void shutdown() { @@ -385,7 +449,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_getKnownTasks() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); @@ -399,7 +463,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport public void test_getRunningTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -409,7 +473,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -428,7 +492,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport public void test_getPendingTasks() { Task pendingTask = K8sTestUtils.createTask("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -438,7 +502,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = K8sTestUtils.createTask("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -462,7 +526,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_getRunnerTaskState_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -477,7 +541,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_streamTaskReports_withExistingTask() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -512,7 +576,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -529,7 +593,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -593,7 +657,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -618,7 +682,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_metricsReported_whenTaskStateChange() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -640,7 +704,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_getTaskLocation_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() { @@ -657,7 +721,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Test public void test_getTaskLocation_throws() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override public TaskLocation getLocation() @@ -689,7 +753,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport public void test_getUsedCapacity() { Assert.assertEquals(0, runner.getUsedCapacity()); - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); Assert.assertEquals(1, runner.getUsedCapacity()); runner.tasks.remove(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b171..fe2b576bccb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -45,36 +45,6 @@ public class KubernetesWorkItemTest extends EasyMockSupport public void setup() { task = NoopTask.create(); - workItem = new KubernetesWorkItem(task, null); - } - - @Test - public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() - { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )); - - Assert.assertThrows( - IllegalStateException.class, - () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( - task, - null, - null, - null, - null - )) - ); - } - - @Test - public void test_shutdown_withoutKubernetesPeonLifecycle() - { - workItem.shutdown(); } @Test @@ -86,7 +56,11 @@ public class KubernetesWorkItemTest extends EasyMockSupport EasyMock.expectLastCall(); replayAll(); - workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + workItem = new KubernetesWorkItem( + task, + null, + kubernetesPeonLifecycle + ); workItem.shutdown(); verifyAll(); @@ -95,7 +69,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport @Test public void test_isPending_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -108,7 +82,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport @Test public void test_isPending_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -121,7 +95,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport @Test public void test_isRunning_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -134,7 +108,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport @Test public void test_isRunning_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -144,22 +118,17 @@ public class KubernetesWorkItemTest extends EasyMockSupport Assert.assertTrue(workItem.isRunning()); } - @Test - public void test_getRunnerTaskState_withoutKubernetesPeonLifecycle_returnsPending() - { - Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); - } - @Test public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -181,7 +150,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.PENDING, workItem.getRunnerTaskState()); } @@ -203,7 +172,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.RUNNING, workItem.getRunnerTaskState()); } @@ -225,46 +194,36 @@ public class KubernetesWorkItemTest extends EasyMockSupport } }; - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(RunnerTaskState.NONE, workItem.getRunnerTaskState()); } - @Test - public void test_streamTaskLogs_withoutKubernetesPeonLifecycle() - { - Assert.assertFalse(workItem.streamTaskLogs().isPresent()); - } - @Test public void test_streamTaskLogs_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); } - @Test - public void test_getLocation_withoutKubernetesPeonLifecycle() - { - Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); - } - @Test public void test_getLocation_withKubernetesPeonLifecycle() { - workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, null, null, null, null - )); + ); + workItem = new KubernetesWorkItem(task, null, peonLifecycle); Assert.assertEquals(TaskLocation.unknown(), workItem.getLocation()); } @@ -272,18 +231,28 @@ public class KubernetesWorkItemTest extends EasyMockSupport @Test public void test_getTaskType() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getType(), workItem.getTaskType()); } @Test public void test_getDataSource() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task.getDataSource(), workItem.getDataSource()); } @Test public void test_getTask() { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); Assert.assertEquals(task, workItem.getTask()); } + + @Test + public void test_peonLifeycle() + { + workItem = new KubernetesWorkItem(task, null, kubernetesPeonLifecycle); + Assert.assertEquals(kubernetesPeonLifecycle, workItem.getPeonLifeycle()); + } }