diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index fadad48e12b..b0d483e5278 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -83,6 +83,8 @@ public class KubernetesPeonLifecycle @MonotonicNonNull private LogWatch logWatch; + private TaskLocation taskLocation; + protected KubernetesPeonLifecycle( Task task, KubernetesPeonClient kubernetesClient, @@ -116,6 +118,8 @@ public class KubernetesPeonLifecycle State.PENDING ); + // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation. + taskLocation = null; kubernetesClient.launchPeonJobAndWaitForStart( job, launchTimeout, @@ -226,27 +230,31 @@ public class KubernetesPeonLifecycle return TaskLocation.unknown(); } - Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); - if (!maybePod.isPresent()) { - return TaskLocation.unknown(); + /* It's okay to cache this because podIP only changes on pod restart, and we have to set restartPolicy to Never + since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher + if we decide we need to change this later. + **/ + if (taskLocation == null) { + Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + return TaskLocation.unknown(); + } + + Pod pod = maybePod.get(); + PodStatus podStatus = pod.getStatus(); + + if (podStatus == null || podStatus.getPodIP() == null) { + return TaskLocation.unknown(); + } + taskLocation = TaskLocation.create( + podStatus.getPodIP(), + DruidK8sConstants.PORT, + DruidK8sConstants.TLS_PORT, + Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")) + ); } - Pod pod = maybePod.get(); - PodStatus podStatus = pod.getStatus(); - - if (podStatus == null || podStatus.getPodIP() == null) { - return TaskLocation.unknown(); - } - - return TaskLocation.create( - podStatus.getPodIP(), - DruidK8sConstants.PORT, - DruidK8sConstants.TLS_PORT, - Boolean.parseBoolean(pod.getMetadata() - .getAnnotations() - .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false") - ) - ); + return taskLocation; } private TaskStatus getTaskStatus(long duration) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 9887c497601..d9160d31c94 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -636,6 +636,35 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport verifyAll(); } + @Test + public void test_getTaskLocation_saveTaskLocation() + throws NoSuchFieldException, IllegalAccessException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper); + setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .withNewStatus() + .withPodIP("ip") + .endStatus() + .build(); + + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once(); + + replayAll(); + + TaskLocation location = peonLifecycle.getTaskLocation(); + peonLifecycle.getTaskLocation(); + Assert.assertEquals("ip", location.getHost()); + Assert.assertEquals(8100, location.getPort()); + Assert.assertEquals(-1, location.getTlsPort()); + + verifyAll(); + } + @Test public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation() throws NoSuchFieldException, IllegalAccessException