Add pod name to TaskLocation for easier observability and debugging. (#14758)

* Add pod name to location

* Add log

* fix style

* Update extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java

Co-authored-by: Suneet Saldanha <suneet@apache.org>

* Fix unit tests

---------

Co-authored-by: Suneet Saldanha <suneet@apache.org>
This commit is contained in:
George Shiqi Wu 2023-08-07 15:33:35 -04:00 committed by GitHub
parent 7d7813372a
commit 14940dc3ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 102 additions and 61 deletions

View File

@ -245,8 +245,10 @@ public class KubernetesPeonLifecycle
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"))
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
log.info("K8s task %s is running at location %s", taskId.getOriginalTaskId(), taskLocation);
}
return taskLocation;

View File

@ -815,6 +815,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}
@ -850,6 +851,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}
@ -886,6 +888,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(-1, location.getPort());
Assert.assertEquals(8091, location.getTlsPort());
Assert.assertEquals(ID, location.getK8sPodName());
verifyAll();
}

View File

@ -1431,7 +1431,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testBeginPublishAndQueueNextTasks() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
@ -1526,7 +1526,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTask() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
@ -1646,7 +1646,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
@ -1757,8 +1757,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@ -1876,8 +1876,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testReportWhenMultipleActiveTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
supervisor = getTestableSupervisorForIdleBehaviour(1, 2, true, "PT10S", null, null, false, null);
@ -2034,8 +2034,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
addSomeEvents(100);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
@ -2339,8 +2339,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
addSomeEvents(100);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost", 234, -1);
Task id1 = createKafkaIndexTask(
"id1",
@ -2548,7 +2548,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhilePausing() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
addSomeEvents(100);
@ -2634,7 +2634,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
addSomeEvents(100);
@ -2749,8 +2749,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testStopGracefully() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@ -3017,8 +3017,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetRunningTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@ -3277,8 +3277,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
tuningConfig
);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
@ -3472,8 +3472,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
// graceful shutdown is expected to be called on running tasks since state is suspended
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost);
@ -4221,10 +4221,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
List<Task> tasks = ImmutableList.of(readingTask, publishingTask, pausedTask, failsToResumePausedTask, waitingTask, pendingTask);
Collection taskRunnerWorkItems = ImmutableList.of(
new TestTaskRunnerWorkItem(readingTask, null, new TaskLocation("testHost", 1001, -1)),
new TestTaskRunnerWorkItem(publishingTask, null, new TaskLocation("testHost", 1002, -1)),
new TestTaskRunnerWorkItem(pausedTask, null, new TaskLocation("testHost", 1003, -1)),
new TestTaskRunnerWorkItem(failsToResumePausedTask, null, new TaskLocation("testHost", 1004, -1))
new TestTaskRunnerWorkItem(readingTask, null, TaskLocation.create("testHost", 1001, -1)),
new TestTaskRunnerWorkItem(publishingTask, null, TaskLocation.create("testHost", 1002, -1)),
new TestTaskRunnerWorkItem(pausedTask, null, TaskLocation.create("testHost", 1003, -1)),
new TestTaskRunnerWorkItem(failsToResumePausedTask, null, TaskLocation.create("testHost", 1004, -1))
);
DateTime startTime = DateTimes.nowUtc();

View File

@ -1475,7 +1475,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testBeginPublishAndQueueNextTasks() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
@ -1604,7 +1604,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTask() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@ -1766,7 +1766,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@ -1916,8 +1916,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L);
@ -2180,7 +2180,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhilePausing() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(EasyMock.anyObject());
@ -2288,7 +2288,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final TaskLocation location = TaskLocation.create("testHost", 1234, -1);
supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(EasyMock.anyObject());
@ -2434,8 +2434,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testStopGracefully() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@ -2855,8 +2855,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testResetRunningTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
@ -3105,9 +3105,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
final Collection workItems = new ArrayList();
workItems.add(new TestTaskRunnerWorkItem(id1, null, new TaskLocation(id1.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id2, null, new TaskLocation(id2.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id3, null, new TaskLocation(id3.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id1, null, TaskLocation.create(id1.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id2, null, TaskLocation.create(id2.getId(), 8100, 8100)));
workItems.add(new TestTaskRunnerWorkItem(id3, null, TaskLocation.create(id3.getId(), 8100, 8100)));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems);
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
@ -3239,8 +3239,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null
);
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
@ -3496,8 +3496,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
// graceful shutdown is expected to be called on running tasks since state is suspended
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true);

View File

@ -381,7 +381,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.RUNNING,
null,
null,
new TaskLocation("test", 0, 0),
TaskLocation.create("test", 0, 0),
null,
null
))));
@ -403,7 +403,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.SUCCESS,
null,
100L,
new TaskLocation("test", 0, 0),
TaskLocation.create("test", 0, 0),
null,
null
))));
@ -527,7 +527,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.RUNNING,
null,
null,
new TaskLocation("test", 0, 0),
TaskLocation.create("test", 0, 0),
null,
null
))));
@ -549,7 +549,7 @@ public class SqlStatementResourceTest extends MSQTestBase
TaskState.SUCCESS,
null,
100L,
new TaskLocation("test", 0, 0),
TaskLocation.create("test", 0, 0),
null,
null
))));

View File

@ -46,7 +46,7 @@ public class TaskRunnerUtilsTest
public void testMakeTaskLocationURL()
{
final URL url = TaskRunnerUtils.makeTaskLocationURL(
new TaskLocation("1.2.3.4", 8090, 8290),
TaskLocation.create("1.2.3.4", 8090, 8290),
"/druid/worker/v1/task/%s/log",
"foo bar&"
);

View File

@ -98,7 +98,7 @@ import java.util.concurrent.Executor;
public class OverlordTest
{
private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
private static final TaskLocation TASK_LOCATION = TaskLocation.create("dummy", 1000, -1);
private TestingServer server;
private Timing timing;

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
@ -30,21 +31,29 @@ import java.util.Objects;
public class TaskLocation
{
private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1);
private static final TaskLocation UNKNOWN = new TaskLocation(null, -1, -1, null);
@Nullable
private final String host;
private final int port;
private final int tlsPort;
@Nullable
private final String k8sPodName;
public static TaskLocation create(String host, int port, int tlsPort)
{
return new TaskLocation(host, port, tlsPort);
return new TaskLocation(host, port, tlsPort, null);
}
public static TaskLocation create(String host, int port, int tlsPort, boolean isTls)
{
return isTls ? new TaskLocation(host, -1, tlsPort) : new TaskLocation(host, port, -1);
return create(host, port, tlsPort, isTls, null);
}
public static TaskLocation create(String host, int port, int tlsPort, boolean isTls, @Nullable String k8sPodName)
{
return isTls ? new TaskLocation(host, -1, tlsPort, k8sPodName) : new TaskLocation(host, port, -1, k8sPodName);
}
public static TaskLocation unknown()
@ -56,12 +65,14 @@ public class TaskLocation
public TaskLocation(
@JsonProperty("host") @Nullable String host,
@JsonProperty("port") int port,
@JsonProperty("tlsPort") int tlsPort
@JsonProperty("tlsPort") int tlsPort,
@JsonProperty("k8sPodName") @Nullable String k8sPodName
)
{
this.host = host;
this.port = port;
this.tlsPort = tlsPort;
this.k8sPodName = k8sPodName;
}
@Nullable
@ -83,6 +94,14 @@ public class TaskLocation
return tlsPort;
}
@Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty
public String getK8sPodName()
{
return k8sPodName;
}
public URL makeURL(final String encodedPathAndQueryString) throws MalformedURLException
{
final String scheme;
@ -111,6 +130,7 @@ public class TaskLocation
"host='" + host + '\'' +
", port=" + port +
", tlsPort=" + tlsPort +
", k8sPodName=" + k8sPodName +
'}';
}
@ -124,12 +144,12 @@ public class TaskLocation
return false;
}
TaskLocation that = (TaskLocation) o;
return port == that.port && tlsPort == that.tlsPort && Objects.equals(host, that.host);
return port == that.port && tlsPort == that.tlsPort && Objects.equals(host, that.host) && Objects.equals(k8sPodName, that.k8sPodName);
}
@Override
public int hashCode()
{
return Objects.hash(host, port, tlsPort);
return Objects.hash(host, port, tlsPort, k8sPodName);
}
}

View File

@ -33,13 +33,13 @@ public class TaskLocationTest
@SuppressWarnings("HttpUrlsUsage")
public void testMakeURL() throws MalformedURLException
{
Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, 0).makeURL("/foo"));
Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, -1).makeURL("/foo"));
Assert.assertEquals(new URL("https://abc:443/foo"), new TaskLocation("abc", 80, 443).makeURL("/foo"));
Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, 0, null).makeURL("/foo"));
Assert.assertEquals(new URL("http://abc:80/foo"), new TaskLocation("abc", 80, -1, null).makeURL("/foo"));
Assert.assertEquals(new URL("https://abc:443/foo"), new TaskLocation("abc", 80, 443, null).makeURL("/foo"));
Assert.assertThrows(
"URL that does not start with '/'",
IllegalArgumentException.class,
() -> new TaskLocation("abc", 80, 443).makeURL("foo")
() -> new TaskLocation("abc", 80, 443, null).makeURL("foo")
);
}
@ -54,6 +54,22 @@ public class TaskLocationTest
Assert.assertEquals(2, tls.getTlsPort());
}
@Test
public void testDefaultK8sJobName()
{
TaskLocation noK8sJobName = TaskLocation.create("foo", 1, 2, false);
Assert.assertNull(noK8sJobName.getK8sPodName());
noK8sJobName = TaskLocation.create("foo", 1, 2);
Assert.assertNull(noK8sJobName.getK8sPodName());
}
@Test
public void testK8sJobNameSet()
{
TaskLocation k8sJobName = TaskLocation.create("foo", 1, 2, false, "job-name");
Assert.assertEquals("job-name", k8sJobName.getK8sPodName());
}
@Test
public void testEqualsAndHashCode()
{

View File

@ -59,7 +59,7 @@ public class TaskStatusTest
);
Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));
TaskStatus success = TaskStatus.success("forkTaskID", new TaskLocation("localhost", 0, 1));
TaskStatus success = TaskStatus.success("forkTaskID", TaskLocation.create("localhost", 0, 1));
Assert.assertEquals(success.getLocation().getHost(), "localhost");
Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1);