diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 302a568a230..447a8632bb9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -245,8 +245,10 @@ public class KubernetesPeonLifecycle podStatus.getPodIP(), DruidK8sConstants.PORT, DruidK8sConstants.TLS_PORT, - Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")) + Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")), + pod.getMetadata() != null ? pod.getMetadata().getName() : "" ); + log.info("K8s task %s is running at location %s", taskId.getOriginalTaskId(), taskLocation); } return taskLocation; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 1ec726f3fa9..980a425a855 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -815,6 +815,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport Assert.assertEquals("ip", location.getHost()); Assert.assertEquals(8100, location.getPort()); Assert.assertEquals(-1, location.getTlsPort()); + Assert.assertEquals(ID, location.getK8sPodName()); verifyAll(); } @@ -850,6 +851,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport Assert.assertEquals("ip", location.getHost()); Assert.assertEquals(8100, location.getPort()); Assert.assertEquals(-1, location.getTlsPort()); + Assert.assertEquals(ID, location.getK8sPodName()); verifyAll(); } @@ -886,6 +888,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport Assert.assertEquals("ip", location.getHost()); Assert.assertEquals(-1, location.getPort()); Assert.assertEquals(8091, location.getTlsPort()); + Assert.assertEquals(ID, location.getK8sPodName()); verifyAll(); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 171883103f1..3bf3a75e30b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1431,7 +1431,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testBeginPublishAndQueueNextTasks() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); @@ -1526,7 +1526,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingTask() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); @@ -1646,7 +1646,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); @@ -1757,8 +1757,8 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingAndReadingTask() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1876,8 +1876,8 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testReportWhenMultipleActiveTasks() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); supervisor = getTestableSupervisorForIdleBehaviour(1, 2, true, "PT10S", null, null, false, null); @@ -2034,8 +2034,8 @@ public class KafkaSupervisorTest extends EasyMockSupport addSomeEvents(100); - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost", 234, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost", 234, -1); Task id1 = createKafkaIndexTask( "id1", @@ -2339,8 +2339,8 @@ public class KafkaSupervisorTest extends EasyMockSupport addSomeEvents(100); - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost", 234, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost", 234, -1); Task id1 = createKafkaIndexTask( "id1", @@ -2548,7 +2548,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testKillUnresponsiveTasksWhilePausing() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); @@ -2634,7 +2634,7 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); @@ -2749,8 +2749,8 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testStopGracefully() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); @@ -3017,8 +3017,8 @@ public class KafkaSupervisorTest extends EasyMockSupport @Test public void testResetRunningTasks() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); @@ -3277,8 +3277,8 @@ public class KafkaSupervisorTest extends EasyMockSupport tuningConfig ); - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); Collection workItems = new ArrayList<>(); workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); @@ -3472,8 +3472,8 @@ public class KafkaSupervisorTest extends EasyMockSupport { // graceful shutdown is expected to be called on running tasks since state is suspended - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost); @@ -4221,10 +4221,10 @@ public class KafkaSupervisorTest extends EasyMockSupport List tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask, waitingTask, pendingTask); Collection taskRunnerWorkItems = ImmutableList.of( - new TestTaskRunnerWorkItem(readingTask, null, new TaskLocation("testHost", 1001, -1)), - new TestTaskRunnerWorkItem(publishingTask, null, new TaskLocation("testHost", 1002, -1)), - new TestTaskRunnerWorkItem(pausedTask, null, new TaskLocation("testHost", 1003, -1)), - new TestTaskRunnerWorkItem(failsToResumePausedTask, null, new TaskLocation("testHost", 1004, -1)) + new TestTaskRunnerWorkItem(readingTask, null, TaskLocation.create("testHost", 1001, -1)), + new TestTaskRunnerWorkItem(publishingTask, null, TaskLocation.create("testHost", 1002, -1)), + new TestTaskRunnerWorkItem(pausedTask, null, TaskLocation.create("testHost", 1003, -1)), + new TestTaskRunnerWorkItem(failsToResumePausedTask, null, TaskLocation.create("testHost", 1004, -1)) ); DateTime startTime = DateTimes.nowUtc(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 5a3295e0bfc..e4890614085 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -1475,7 +1475,7 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testBeginPublishAndQueueNextTasks() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); @@ -1604,7 +1604,7 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingTask() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); final Map timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1766,7 +1766,7 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); final Map timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1916,8 +1916,8 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testDiscoverExistingPublishingAndReadingTask() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); final Map timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L); @@ -2180,7 +2180,7 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testKillUnresponsiveTasksWhilePausing() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); supervisorRecordSupplier.assign(EasyMock.anyObject()); @@ -2288,7 +2288,7 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { - final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final TaskLocation location = TaskLocation.create("testHost", 1234, -1); supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null); supervisorRecordSupplier.assign(EasyMock.anyObject()); @@ -2434,8 +2434,8 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testStopGracefully() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); @@ -2855,8 +2855,8 @@ public class KinesisSupervisorTest extends EasyMockSupport @Test public void testResetRunningTasks() throws Exception { - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null); @@ -3105,9 +3105,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); final Collection workItems = new ArrayList(); - workItems.add(new TestTaskRunnerWorkItem(id1, null, new TaskLocation(id1.getId(), 8100, 8100))); - workItems.add(new TestTaskRunnerWorkItem(id2, null, new TaskLocation(id2.getId(), 8100, 8100))); - workItems.add(new TestTaskRunnerWorkItem(id3, null, new TaskLocation(id3.getId(), 8100, 8100))); + workItems.add(new TestTaskRunnerWorkItem(id1, null, TaskLocation.create(id1.getId(), 8100, 8100))); + workItems.add(new TestTaskRunnerWorkItem(id2, null, TaskLocation.create(id2.getId(), 8100, 8100))); + workItems.add(new TestTaskRunnerWorkItem(id3, null, TaskLocation.create(id3.getId(), 8100, 8100))); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); @@ -3239,8 +3239,8 @@ public class KinesisSupervisorTest extends EasyMockSupport null ); - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); Collection workItems = new ArrayList<>(); workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); @@ -3496,8 +3496,8 @@ public class KinesisSupervisorTest extends EasyMockSupport { // graceful shutdown is expected to be called on running tasks since state is suspended - final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); - final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index fa4d3c9f409..d7f04d82777 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -381,7 +381,7 @@ public class SqlStatementResourceTest extends MSQTestBase TaskState.RUNNING, null, null, - new TaskLocation("test", 0, 0), + TaskLocation.create("test", 0, 0), null, null )))); @@ -403,7 +403,7 @@ public class SqlStatementResourceTest extends MSQTestBase TaskState.SUCCESS, null, 100L, - new TaskLocation("test", 0, 0), + TaskLocation.create("test", 0, 0), null, null )))); @@ -527,7 +527,7 @@ public class SqlStatementResourceTest extends MSQTestBase TaskState.RUNNING, null, null, - new TaskLocation("test", 0, 0), + TaskLocation.create("test", 0, 0), null, null )))); @@ -549,7 +549,7 @@ public class SqlStatementResourceTest extends MSQTestBase TaskState.SUCCESS, null, 100L, - new TaskLocation("test", 0, 0), + TaskLocation.create("test", 0, 0), null, null )))); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java index 529bafd15f3..820b2e893cf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java @@ -46,7 +46,7 @@ public class TaskRunnerUtilsTest public void testMakeTaskLocationURL() { final URL url = TaskRunnerUtils.makeTaskLocationURL( - new TaskLocation("1.2.3.4", 8090, 8290), + TaskLocation.create("1.2.3.4", 8090, 8290), "/druid/worker/v1/task/%s/log", "foo bar&" ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 0b9c77ef661..3f8c1a98705 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -98,7 +98,7 @@ import java.util.concurrent.Executor; public class OverlordTest { - private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1); + private static final TaskLocation TASK_LOCATION = TaskLocation.create("dummy", 1000, -1); private TestingServer server; private Timing timing; diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java index fe8accd33d3..21e2006211f 100644 --- a/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java +++ b/processing/src/main/java/org/apache/druid/indexer/TaskLocation.java @@ -20,6 +20,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.IAE; @@ -30,21 +31,29 @@ import java.util.Objects; public class TaskLocation { - private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1); + private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1, null); @Nullable private final String host; private final int port; private final int tlsPort; + @Nullable + private final String k8sPodName; + public static TaskLocation create(String host, int port, int tlsPort) { - return new TaskLocation(host, port, tlsPort); + return new TaskLocation(host, port, tlsPort, null); } public static TaskLocation create(String host, int port, int tlsPort, boolean isTls) { - return isTls ? new TaskLocation(host, -1, tlsPort) : new TaskLocation(host, port, -1); + return create(host, port, tlsPort, isTls, null); + } + + public static TaskLocation create(String host, int port, int tlsPort, boolean isTls, @Nullable String k8sPodName) + { + return isTls ? new TaskLocation(host, -1, tlsPort, k8sPodName) : new TaskLocation(host, port, -1, k8sPodName); } public static TaskLocation unknown() @@ -56,12 +65,14 @@ public class TaskLocation public TaskLocation( @JsonProperty("host") @Nullable String host, @JsonProperty("port") int port, - @JsonProperty("tlsPort") int tlsPort + @JsonProperty("tlsPort") int tlsPort, + @JsonProperty("k8sPodName") @Nullable String k8sPodName ) { this.host = host; this.port = port; this.tlsPort = tlsPort; + this.k8sPodName = k8sPodName; } @Nullable @@ -83,6 +94,14 @@ public class TaskLocation return tlsPort; } + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty + public String getK8sPodName() + { + return k8sPodName; + } + public URL makeURL(final String encodedPathAndQueryString) throws MalformedURLException { final String scheme; @@ -111,6 +130,7 @@ public class TaskLocation "host='" + host + '\'' + ", port=" + port + ", tlsPort=" + tlsPort + + ", k8sPodName=" + k8sPodName + '}'; } @@ -124,12 +144,12 @@ public class TaskLocation return false; } TaskLocation that = (TaskLocation) o; - return port == that.port && tlsPort == that.tlsPort && Objects.equals(host, that.host); + return port == that.port && tlsPort == that.tlsPort && Objects.equals(host, that.host) && Objects.equals(k8sPodName, that.k8sPodName); } @Override public int hashCode() { - return Objects.hash(host, port, tlsPort); + return Objects.hash(host, port, tlsPort, k8sPodName); } } diff --git a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java index 1822915fea3..03a751c5dd1 100644 --- a/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java +++ b/processing/src/test/java/org/apache/druid/indexer/TaskLocationTest.java @@ -33,13 +33,13 @@ public class TaskLocationTest @SuppressWarnings("HttpUrlsUsage") public void testMakeURL() throws MalformedURLException { - Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, 0).makeURL("/foo")); - Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, -1).makeURL("/foo")); - Assert.assertEquals(new URL("https://abc:443/foo"), new TaskLocation("abc", 80, 443).makeURL("/foo")); + Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, 0, null).makeURL("/foo")); + Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, -1, null).makeURL("/foo")); + Assert.assertEquals(new URL("https://abc:443/foo"), new TaskLocation("abc", 80, 443, null).makeURL("/foo")); Assert.assertThrows( "URL that does not start with '/'", IllegalArgumentException.class, - () -> new TaskLocation("abc", 80, 443).makeURL("foo") + () -> new TaskLocation("abc", 80, 443, null).makeURL("foo") ); } @@ -54,6 +54,22 @@ public class TaskLocationTest Assert.assertEquals(2, tls.getTlsPort()); } + @Test + public void testDefaultK8sJobName() + { + TaskLocation noK8sJobName = TaskLocation.create("foo", 1, 2, false); + Assert.assertNull(noK8sJobName.getK8sPodName()); + noK8sJobName = TaskLocation.create("foo", 1, 2); + Assert.assertNull(noK8sJobName.getK8sPodName()); + } + + @Test + public void testK8sJobNameSet() + { + TaskLocation k8sJobName = TaskLocation.create("foo", 1, 2, false, "job-name"); + Assert.assertEquals("job-name", k8sJobName.getK8sPodName()); + } + @Test public void testEqualsAndHashCode() { diff --git a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java index 939d9f04d37..d0cf6b3d2cb 100644 --- a/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java +++ b/processing/src/test/java/org/apache/druid/indexer/TaskStatusTest.java @@ -59,7 +59,7 @@ public class TaskStatusTest ); Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class)); - TaskStatus success = TaskStatus.success("forkTaskID", new TaskLocation("localhost", 0, 1)); + TaskStatus success = TaskStatus.success("forkTaskID", TaskLocation.create("localhost", 0, 1)); Assert.assertEquals(success.getLocation().getHost(), "localhost"); Assert.assertEquals(success.getLocation().getPort(), 0); Assert.assertEquals(success.getLocation().getTlsPort(), 1);