Keep track of task location for completed tasks (#8286)

* Keep track of task location for completed tasks

* Add TaskLifecycleTest location checks
This commit is contained in:
Jonathan Wei 2019-08-15 16:57:02 -05:00 committed by GitHub
parent 8924d285dc
commit ef7b9606f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 16 deletions

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
@ -37,32 +39,32 @@ public class TaskStatus
public static TaskStatus running(String taskId)
{
return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
return new TaskStatus(taskId, TaskState.RUNNING, -1, null, null);
}
public static TaskStatus success(String taskId)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
}
public static TaskStatus success(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
}
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, null);
return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
}
public static TaskStatus failure(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}
public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1, null);
return new TaskStatus(taskId, code, -1, null, null);
}
// The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
@ -80,19 +82,22 @@ public class TaskStatus
private final TaskState status;
private final long duration;
private final String errorMsg;
private final TaskLocation location;
@JsonCreator
protected TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") TaskState status,
@JsonProperty("duration") long duration,
@JsonProperty("errorMsg") String errorMsg
@JsonProperty("errorMsg") String errorMsg,
@Nullable @JsonProperty("location") TaskLocation location
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.errorMsg = truncateErrorMsg(errorMsg);
this.location = location == null ? TaskLocation.unknown() : location;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
@ -123,6 +128,12 @@ public class TaskStatus
return errorMsg;
}
@JsonProperty("location")
public TaskLocation getLocation()
{
return location;
}
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
@ -172,7 +183,21 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, _duration, errorMsg);
return new TaskStatus(id, status, _duration, errorMsg, location);
}
public TaskStatus withLocation(TaskLocation location)
{
if (location == null) {
location = TaskLocation.unknown();
}
return new TaskStatus(
id,
status,
duration,
errorMsg,
location
);
}
@Override

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class TaskStatusTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
final TaskStatus status = new TaskStatus(
"testId",
TaskState.RUNNING,
1000L,
"an error message",
TaskLocation.create("testHost", 1010, -1)
);
final String json = mapper.writeValueAsString(status);
Assert.assertEquals(status, mapper.readValue(json, TaskStatus.class));
final String jsonNoLocation = "{\n"
+ "\"id\": \"testId\",\n"
+ "\"status\": \"SUCCESS\",\n"
+ "\"duration\": 3000,\n"
+ "\"errorMsg\": \"hello\"\n"
+ "}";
final TaskStatus statusNoLocation = new TaskStatus(
"testId",
TaskState.SUCCESS,
3000L,
"hello",
null
);
Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));
}
}

View File

@ -474,6 +474,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return null;
}
@Override
public TaskLocation getTaskLocation(String taskId)
{
if (pendingTasks.containsKey(taskId)) {
return pendingTasks.get(taskId).getLocation();
}
if (runningTasks.containsKey(taskId)) {
return runningTasks.get(taskId).getLocation();
}
if (completeTasks.containsKey(taskId)) {
return completeTasks.get(taskId).getLocation();
}
return TaskLocation.unknown();
}
@Override
public Optional<ScalingStats> getScalingStats()
{

View File

@ -301,6 +301,12 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem);
}
@Override
public TaskLocation getTaskLocation(String taskId)
{
return location;
}
@Override
public Optional<ScalingStats> getScalingStats()
{

View File

@ -30,6 +30,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@ -421,6 +422,8 @@ public class TaskQueue
{
giant.lock();
TaskLocation taskLocation = TaskLocation.unknown();
try {
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskStatus, "status");
@ -433,6 +436,7 @@ public class TaskQueue
);
// Inform taskRunner that this task can be shut down
try {
taskLocation = taskRunner.getTaskLocation(task.getId());
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Exception e) {
@ -461,7 +465,7 @@ public class TaskQueue
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus);
taskStorage.setStatus(taskStatus.withLocation(taskLocation));
log.info("Task done: %s", task);
managementMayBeNecessary.signalAll();
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
@ -109,6 +110,11 @@ public interface TaskRunner
return null;
}
default TaskLocation getTaskLocation(String taskId)
{
return TaskLocation.unknown();
}
/**
* Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
*

View File

@ -1140,6 +1140,17 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
@Override
public TaskLocation getTaskLocation(String taskId)
{
final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
if (workItem == null) {
return TaskLocation.unknown();
} else {
return workItem.getLocation();
}
}
public List<String> getBlacklistedWorkers()
{
return blackListedWorkers.values().stream().map(

View File

@ -293,7 +293,7 @@ public class OverlordResource
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.WAITING,
taskInfo.getStatus().getDuration(),
TaskLocation.unknown(),
taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
@ -598,7 +598,7 @@ public class OverlordResource
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.NONE,
taskInfo.getStatus().getDuration(),
TaskLocation.unknown(),
taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
);

View File

@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
@ -233,7 +234,12 @@ public class TaskLifecycleTest
private TaskConfig taskConfig;
private DataSegmentPusher dataSegmentPusher;
private AppenderatorsManager appenderatorsManager;
private DruidNode druidNode = new DruidNode("dummy", "dummy", false, 10000, null, true, false);
private TaskLocation taskLocation = TaskLocation.create(
druidNode.getHost(),
druidNode.getPlaintextPort(),
druidNode.getTlsPort()
);
private int pushedSegments;
private int announcedSinks;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
@ -644,7 +650,7 @@ public class TaskLifecycleTest
tb,
taskConfig,
emitter,
new DruidNode("dummy", "dummy", false, 10000, null, true, false),
druidNode,
new ServerConfig()
);
}
@ -731,6 +737,7 @@ public class TaskLifecycleTest
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
@ -808,6 +815,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(indexTask);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@ -875,6 +883,7 @@ public class TaskLifecycleTest
final Task killTask = new KillTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null);
final TaskStatus status = runTask(killTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
@ -897,6 +906,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(rtishTask);
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@ -911,6 +921,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(noopTask);
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@ -925,6 +936,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(neverReadyTask);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@ -978,7 +990,7 @@ public class TaskLifecycleTest
};
final TaskStatus status = runTask(task);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("segments published", 1, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
@ -1019,6 +1031,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(task);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments published", 0, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@ -1058,6 +1071,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(task);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments published", 0, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@ -1091,7 +1105,9 @@ public class TaskLifecycleTest
Thread.sleep(10);
}
Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess());
TaskStatus status = tsqa.getStatus(taskId).get();
Assert.assertTrue("Task should be in Success state", status.isSuccess());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals(1, announcedSinks);
Assert.assertEquals(1, pushedSegments);
@ -1161,7 +1177,9 @@ public class TaskLifecycleTest
Thread.sleep(10);
}
Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure());
TaskStatus status = tsqa.getStatus(taskId).get();
Assert.assertTrue("Task should be in Failure state", status.isFailure());
Assert.assertEquals(taskLocation, status.getLocation());
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@ -1235,6 +1253,7 @@ public class TaskLifecycleTest
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());