diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 6b90fef0617..a0ffe85c67c 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + /** * Represents the status of a task from the perspective of the coordinator. The task may be ongoing * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true). @@ -37,32 +39,32 @@ public class TaskStatus public static TaskStatus running(String taskId) { - return new TaskStatus(taskId, TaskState.RUNNING, -1, null); + return new TaskStatus(taskId, TaskState.RUNNING, -1, null, null); } public static TaskStatus success(String taskId) { - return new TaskStatus(taskId, TaskState.SUCCESS, -1, null); + return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null); } public static TaskStatus success(String taskId, String errorMsg) { - return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg); + return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null); } public static TaskStatus failure(String taskId) { - return new TaskStatus(taskId, TaskState.FAILED, -1, null); + return new TaskStatus(taskId, TaskState.FAILED, -1, null, null); } public static TaskStatus failure(String taskId, String errorMsg) { - return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg); + return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null); } public static TaskStatus fromCode(String taskId, TaskState code) { - return new TaskStatus(taskId, code, -1, null); + return new TaskStatus(taskId, code, -1, null, null); } // The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage. @@ -80,19 +82,22 @@ public class TaskStatus private final TaskState status; private final long duration; private final String errorMsg; + private final TaskLocation location; @JsonCreator protected TaskStatus( @JsonProperty("id") String id, @JsonProperty("status") TaskState status, @JsonProperty("duration") long duration, - @JsonProperty("errorMsg") String errorMsg + @JsonProperty("errorMsg") String errorMsg, + @Nullable @JsonProperty("location") TaskLocation location ) { this.id = id; this.status = status; this.duration = duration; this.errorMsg = truncateErrorMsg(errorMsg); + this.location = location == null ? TaskLocation.unknown() : location; // Check class invariants. Preconditions.checkNotNull(id, "id"); @@ -123,6 +128,12 @@ public class TaskStatus return errorMsg; } + @JsonProperty("location") + public TaskLocation getLocation() + { + return location; + } + /** * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * isSuccess, or isFailure will be true at any one time. @@ -172,7 +183,21 @@ public class TaskStatus public TaskStatus withDuration(long _duration) { - return new TaskStatus(id, status, _duration, errorMsg); + return new TaskStatus(id, status, _duration, errorMsg, location); + } + + public TaskStatus withLocation(TaskLocation location) + { + if (location == null) { + location = TaskLocation.unknown(); + } + return new TaskStatus( + id, + status, + duration, + errorMsg, + location + ); } @Override diff --git a/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java new file mode 100644 index 00000000000..92651bca815 --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TaskStatusTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + + final TaskStatus status = new TaskStatus( + "testId", + TaskState.RUNNING, + 1000L, + "an error message", + TaskLocation.create("testHost", 1010, -1) + ); + + final String json = mapper.writeValueAsString(status); + Assert.assertEquals(status, mapper.readValue(json, TaskStatus.class)); + + final String jsonNoLocation = "{\n" + + "\"id\": \"testId\",\n" + + "\"status\": \"SUCCESS\",\n" + + "\"duration\": 3000,\n" + + "\"errorMsg\": \"hello\"\n" + + "}"; + + final TaskStatus statusNoLocation = new TaskStatus( + "testId", + TaskState.SUCCESS, + 3000L, + "hello", + null + ); + Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class)); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index aa9a582edf4..6206b9d0282 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -474,6 +474,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return null; } + @Override + public TaskLocation getTaskLocation(String taskId) + { + if (pendingTasks.containsKey(taskId)) { + return pendingTasks.get(taskId).getLocation(); + } + if (runningTasks.containsKey(taskId)) { + return runningTasks.get(taskId).getLocation(); + } + if (completeTasks.containsKey(taskId)) { + return completeTasks.get(taskId).getLocation(); + } + + return TaskLocation.unknown(); + } + @Override public Optional getScalingStats() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index e814cc36b4b..fab5a4f1391 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -301,6 +301,12 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem); } + @Override + public TaskLocation getTaskLocation(String taskId) + { + return location; + } + @Override public Optional getScalingStats() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 332356b1ba7..a9153b80b22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -421,6 +422,8 @@ public class TaskQueue { giant.lock(); + TaskLocation taskLocation = TaskLocation.unknown(); + try { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(taskStatus, "status"); @@ -433,6 +436,7 @@ public class TaskQueue ); // Inform taskRunner that this task can be shut down try { + taskLocation = taskRunner.getTaskLocation(task.getId()); taskRunner.shutdown(task.getId(), reasonFormat, args); } catch (Exception e) { @@ -461,7 +465,7 @@ public class TaskQueue if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); } else { - taskStorage.setStatus(taskStatus); + taskStorage.setStatus(taskStatus.withLocation(taskLocation)); log.info("Task done: %s", task); managementMayBeNecessary.signalAll(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 81596596daa..e9dab7f4f1e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; @@ -109,6 +110,11 @@ public interface TaskRunner return null; } + default TaskLocation getTaskLocation(String taskId) + { + return TaskLocation.unknown(); + } + /** * Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index e5fa3758e0d..e96c606b7a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1140,6 +1140,17 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } + @Override + public TaskLocation getTaskLocation(String taskId) + { + final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); + if (workItem == null) { + return TaskLocation.unknown(); + } else { + return workItem.getLocation(); + } + } + public List getBlacklistedWorkers() { return blackListedWorkers.values().stream().map( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index b1539589ec9..2dc68394eb6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -293,7 +293,7 @@ public class OverlordResource taskInfo.getStatus().getStatusCode(), RunnerTaskState.WAITING, taskInfo.getStatus().getDuration(), - TaskLocation.unknown(), + taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(), taskInfo.getDataSource(), taskInfo.getStatus().getErrorMsg() ) @@ -598,7 +598,7 @@ public class OverlordResource taskInfo.getStatus().getStatusCode(), RunnerTaskState.NONE, taskInfo.getStatus().getDuration(), - TaskLocation.unknown(), + taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(), taskInfo.getDataSource(), taskInfo.getStatus().getErrorMsg() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 475b8ef6f68..1ae89f7a0a3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.SegmentLoaderFactory; @@ -233,7 +234,12 @@ public class TaskLifecycleTest private TaskConfig taskConfig; private DataSegmentPusher dataSegmentPusher; private AppenderatorsManager appenderatorsManager; - + private DruidNode druidNode = new DruidNode("dummy", "dummy", false, 10000, null, true, false); + private TaskLocation taskLocation = TaskLocation.create( + druidNode.getHost(), + druidNode.getPlaintextPort(), + druidNode.getTlsPort() + ); private int pushedSegments; private int announcedSinks; private SegmentHandoffNotifierFactory handoffNotifierFactory; @@ -644,7 +650,7 @@ public class TaskLifecycleTest tb, taskConfig, emitter, - new DruidNode("dummy", "dummy", false, 10000, null, true, false), + druidNode, new ServerConfig() ); } @@ -731,6 +737,7 @@ public class TaskLifecycleTest final List loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode()); Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments); Assert.assertEquals("num segments published", 2, mdc.getPublished().size()); @@ -808,6 +815,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(indexTask); Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } @@ -875,6 +883,7 @@ public class TaskLifecycleTest final Task killTask = new KillTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null); final TaskStatus status = runTask(killTask); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size()); @@ -897,6 +906,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(rtishTask); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("num segments published", 2, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } @@ -911,6 +921,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(noopTask); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } @@ -925,6 +936,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(neverReadyTask); Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } @@ -978,7 +990,7 @@ public class TaskLifecycleTest }; final TaskStatus status = runTask(task); - + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("segments published", 1, mdc.getPublished().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); @@ -1019,6 +1031,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(task); Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("segments published", 0, mdc.getPublished().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } @@ -1058,6 +1071,7 @@ public class TaskLifecycleTest final TaskStatus status = runTask(task); Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("segments published", 0, mdc.getPublished().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } @@ -1091,7 +1105,9 @@ public class TaskLifecycleTest Thread.sleep(10); } - Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess()); + TaskStatus status = tsqa.getStatus(taskId).get(); + Assert.assertTrue("Task should be in Success state", status.isSuccess()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals(1, announcedSinks); Assert.assertEquals(1, pushedSegments); @@ -1161,7 +1177,9 @@ public class TaskLifecycleTest Thread.sleep(10); } - Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure()); + TaskStatus status = tsqa.getStatus(taskId).get(); + Assert.assertTrue("Task should be in Failure state", status.isFailure()); + Assert.assertEquals(taskLocation, status.getLocation()); EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate); } @@ -1235,6 +1253,7 @@ public class TaskLifecycleTest final List loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); + Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments); Assert.assertEquals("num segments published", 2, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());