diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 14fe54778a2..33efd848d0a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -23,6 +23,7 @@ import com.google.api.client.util.Lists; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -60,7 +61,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -309,23 +309,25 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner @Override public List>> restore() { - List>> restoredTasks = new ArrayList<>(); - for (Job job : client.getPeonJobs()) { - try { - Task task = adapter.toTask(job); - restoredTasks.add(Pair.of(task, joinAsync(task))); - } - catch (IOException e) { - log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); - } - } - return restoredTasks; + return ImmutableList.of(); } @Override @LifecycleStart public void start() { + log.info("Starting K8sTaskRunner..."); + // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. + for (Job job : client.getPeonJobs()) { + try { + joinAsync(adapter.toTask(job)); + } + catch (IOException e) { + log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); + } + } + log.info("Loaded %,d tasks from previous run", tasks.size()); + cleanupExecutor.scheduleAtFixedRate( () -> client.deleteCompletedPeonJobsOlderThan( @@ -339,7 +341,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); } - @Override @LifecycleStop public void stop() diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 11e7a75af69..e6b1b8006af 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -31,7 +31,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.java.util.http.client.HttpClient; @@ -56,7 +55,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -101,6 +99,89 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport ); } + @Test + public void test_start_withExistingJobs() throws IOException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected ListenableFuture joinAsync(Task task) + { + return tasks.computeIfAbsent( + task.getId(), + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())) + ) + ).getResult(); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(1, runner.tasks.size()); + } + + @Test + public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected ListenableFuture joinAsync(Task task) + { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) + .getResult(); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(0, runner.tasks.size()); + } + @Test public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() { @@ -263,80 +344,6 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport runner.shutdown(task.getId(), ""); } - @Test - public void test_restore_withExistingJobs() throws IOException - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ) { - @Override - protected ListenableFuture joinAsync(Task task) - { - return new KubernetesWorkItem(task, null).getResult(); - } - }; - - Job job = new JobBuilder() - .withNewMetadata() - .withName(ID) - .endMetadata() - .build(); - - EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); - - replayAll(); - - List>> tasks = runner.restore(); - - verifyAll(); - - Assert.assertNotNull(tasks); - Assert.assertEquals(1, tasks.size()); - } - - @Test - public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ) { - @Override - protected ListenableFuture joinAsync(Task task) - { - return new KubernetesWorkItem(task, null).getResult(); - } - }; - - Job job = new JobBuilder() - .withNewMetadata() - .withName(ID) - .endMetadata() - .build(); - - EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); - - replayAll(); - - List>> tasks = runner.restore(); - - verifyAll(); - - Assert.assertNotNull(tasks); - Assert.assertEquals(0, tasks.size()); - } - @Test public void test_getTotalTaskSlotCount() {