mirror of https://github.com/apache/druid.git
Get task location should be stored on the lifecycle object (#14649)
* Fix issue with long data source names * Use the regular library * Save location and tls enabled * Null out before running * add another comment
This commit is contained in:
parent
28914bbab8
commit
f742bb7376
|
@ -83,6 +83,8 @@ public class KubernetesPeonLifecycle
|
|||
@MonotonicNonNull
|
||||
private LogWatch logWatch;
|
||||
|
||||
private TaskLocation taskLocation;
|
||||
|
||||
protected KubernetesPeonLifecycle(
|
||||
Task task,
|
||||
KubernetesPeonClient kubernetesClient,
|
||||
|
@ -116,6 +118,8 @@ public class KubernetesPeonLifecycle
|
|||
State.PENDING
|
||||
);
|
||||
|
||||
// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
|
||||
taskLocation = null;
|
||||
kubernetesClient.launchPeonJobAndWaitForStart(
|
||||
job,
|
||||
launchTimeout,
|
||||
|
@ -226,27 +230,31 @@ public class KubernetesPeonLifecycle
|
|||
return TaskLocation.unknown();
|
||||
}
|
||||
|
||||
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
|
||||
if (!maybePod.isPresent()) {
|
||||
return TaskLocation.unknown();
|
||||
/* It's okay to cache this because podIP only changes on pod restart, and we have to set restartPolicy to Never
|
||||
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
|
||||
if we decide we need to change this later.
|
||||
**/
|
||||
if (taskLocation == null) {
|
||||
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
|
||||
if (!maybePod.isPresent()) {
|
||||
return TaskLocation.unknown();
|
||||
}
|
||||
|
||||
Pod pod = maybePod.get();
|
||||
PodStatus podStatus = pod.getStatus();
|
||||
|
||||
if (podStatus == null || podStatus.getPodIP() == null) {
|
||||
return TaskLocation.unknown();
|
||||
}
|
||||
taskLocation = TaskLocation.create(
|
||||
podStatus.getPodIP(),
|
||||
DruidK8sConstants.PORT,
|
||||
DruidK8sConstants.TLS_PORT,
|
||||
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"))
|
||||
);
|
||||
}
|
||||
|
||||
Pod pod = maybePod.get();
|
||||
PodStatus podStatus = pod.getStatus();
|
||||
|
||||
if (podStatus == null || podStatus.getPodIP() == null) {
|
||||
return TaskLocation.unknown();
|
||||
}
|
||||
|
||||
return TaskLocation.create(
|
||||
podStatus.getPodIP(),
|
||||
DruidK8sConstants.PORT,
|
||||
DruidK8sConstants.TLS_PORT,
|
||||
Boolean.parseBoolean(pod.getMetadata()
|
||||
.getAnnotations()
|
||||
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")
|
||||
)
|
||||
);
|
||||
return taskLocation;
|
||||
}
|
||||
|
||||
private TaskStatus getTaskStatus(long duration)
|
||||
|
|
|
@ -636,6 +636,35 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getTaskLocation_saveTaskLocation()
|
||||
throws NoSuchFieldException, IllegalAccessException
|
||||
{
|
||||
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
|
||||
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
|
||||
|
||||
Pod pod = new PodBuilder()
|
||||
.withNewMetadata()
|
||||
.withName(ID)
|
||||
.endMetadata()
|
||||
.withNewStatus()
|
||||
.withPodIP("ip")
|
||||
.endStatus()
|
||||
.build();
|
||||
|
||||
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();
|
||||
|
||||
replayAll();
|
||||
|
||||
TaskLocation location = peonLifecycle.getTaskLocation();
|
||||
peonLifecycle.getTaskLocation();
|
||||
Assert.assertEquals("ip", location.getHost());
|
||||
Assert.assertEquals(8100, location.getPort());
|
||||
Assert.assertEquals(-1, location.getTlsPort());
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
|
||||
throws NoSuchFieldException, IllegalAccessException
|
||||
|
|
Loading…
Reference in New Issue