Add the error message in taskStatus for task failures in overlord (#11419)

* add error messages in taskStatus for task failures in overlord

* unused imports

* add helper message for logs to look up

* fix tests

* fix counting the same task failures more than once

* same fix for HttpRemoteTaskRunner
This commit is contained in:
Jihoon Son 2021-07-15 13:14:28 -07:00 committed by GitHub
parent a366753ba5
commit 8729b40893
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 601 additions and 93 deletions

View File

@ -47,11 +47,21 @@ public class TaskStatus
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
}
/**
* The succeeded task status should not have error messages.
* Use {@link #success(String)} instead.
*/
@Deprecated
public static TaskStatus success(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
}
/**
* All failed task status must have a non-null error message.
* Use {@link #failure(String, String)} instead.
*/
@Deprecated
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
@ -62,6 +72,11 @@ public class TaskStatus
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}
/**
* This method is deprecated because it does not handle the error message of failed task status properly.
* Use {@link #success(String)} or {@link #failure(String, String)} instead.
*/
@Deprecated
public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1, null, null);

View File

@ -52,6 +52,7 @@ import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
@ -108,6 +109,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
@ -540,7 +542,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
} else if ((completeTask = completeTasks.get(task.getId())) != null) {
return completeTask.getResult();
} else {
return addPendingTask(task).getResult();
RemoteTaskRunnerWorkItem workItem = addPendingTask(task);
runPendingTasks();
return workItem.getResult();
}
}
@ -681,7 +685,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
/**
* Adds a task to the pending queue
* Adds a task to the pending queue.
* {@link #runPendingTasks()} should be called to run the pending task.
*/
@VisibleForTesting
RemoteTaskRunnerWorkItem addPendingTask(final Task task)
@ -696,7 +701,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
runPendingTasks();
return taskRunnerWorkItem;
}
@ -705,7 +709,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
* This method should be run each time there is new worker capacity or if new tasks are assigned.
*/
private void runPendingTasks()
@VisibleForTesting
void runPendingTasks()
{
runPendingTasksExec.submit(
(Callable<Void>) () -> {
@ -716,30 +721,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
sortByInsertionTime(copy);
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
try {
//this can still be null due to race from explicit task shutdown request
//or if another thread steals and completes this task right after this thread makes copy
//of pending tasks. See https://github.com/apache/druid/issues/2842 .
Task task = pendingTaskPayloads.get(taskId);
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception while trying to assign task")
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
if (workItem != null) {
taskComplete(workItem, null, TaskStatus.failure(taskId));
}
}
finally {
tryAssignTasks.remove(taskId);
}
}
runPendingTask(taskRunnerWorkItem);
}
}
catch (Exception e) {
@ -751,6 +733,45 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
/**
* Run one pending task. This method must be called in the same class except for unit tests.
*/
@VisibleForTesting
void runPendingTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{
String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
try {
//this can still be null due to race from explicit task shutdown request
//or if another thread steals and completes this task right after this thread makes copy
//of pending tasks. See https://github.com/apache/druid/issues/2842 .
Task task = pendingTaskPayloads.get(taskId);
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception while trying to assign task")
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
if (workItem != null) {
taskComplete(
workItem,
null,
TaskStatus.failure(
taskId,
StringUtils.format("Failed to assign this task. See overlord logs for more details.")
)
);
}
}
finally {
tryAssignTasks.remove(taskId);
}
}
}
@VisibleForTesting
static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> tasks)
{
@ -930,7 +951,18 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
elapsed,
config.getTaskAssignmentTimeout()
).emit();
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
taskComplete(
taskRunnerWorkItem,
theZkWorker,
TaskStatus.failure(
task.getId(),
StringUtils.format(
"The worker that this task is assigned did not start it in timeout[%s]. "
+ "See overlord logs for more details.",
config.getTaskAssignmentTimeout()
)
)
);
break;
}
}
@ -1066,9 +1098,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
log.warn("Task[%s] just disappeared!", taskId);
final TaskStatus taskStatus = TaskStatus.failure(
taskId,
"The worker that this task was assigned disappeared. See overlord logs for more details."
);
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, taskStatus);
} else {
log.info("Task[%s] went bye bye.", taskId);
}
@ -1189,8 +1225,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask));
TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask));
final TaskStatus taskStatus = TaskStatus.failure(
assignedTask,
StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.")
);
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, taskStatus);
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
@ -1235,7 +1275,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker,
@Nullable ZkWorker zkWorker,
TaskStatus taskStatus
)
{
@ -1255,41 +1295,76 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
// Move from running -> complete
completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
runningTasks.remove(taskStatus.getId());
// If the task was running and this is the first complete event,
// previousComplete should be null and removedRunning should not.
final RemoteTaskRunnerWorkItem previousComplete = completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
final RemoteTaskRunnerWorkItem removedRunning = runningTasks.remove(taskStatus.getId());
// Update success/failure counters
if (zkWorker != null) {
if (taskStatus.isSuccess()) {
zkWorker.resetContinuouslyFailedTasksCount();
if (blackListedWorkers.remove(zkWorker)) {
zkWorker.setBlacklistedUntil(null);
log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
if (previousComplete != null && removedRunning != null) {
log.warn(
"This is not the first complete event for task[%s], but it was still known as running. "
+ "Ignoring the previously known running status.",
taskStatus.getId()
);
}
if (previousComplete != null) {
// This is not the first complete event for the same task.
try {
// getResult().get() must return immediately.
TaskState lastKnownState = previousComplete.getResult().get(1, TimeUnit.MILLISECONDS).getStatusCode();
if (taskStatus.getStatusCode() != lastKnownState) {
log.warn(
"The state of the new task complete event is different from its last known state. "
+ "New state[%s], last known state[%s]",
taskStatus.getStatusCode(),
lastKnownState
);
}
} else if (taskStatus.isFailure()) {
zkWorker.incrementContinuouslyFailedTasksCount();
}
catch (InterruptedException e) {
log.warn(e, "Interrupted while getting the last known task status.");
Thread.currentThread().interrupt();
}
catch (ExecutionException | TimeoutException e) {
// This case should not really happen.
log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
}
} else {
// This is the first complete event for this task.
// Update success/failure counters
if (zkWorker != null) {
if (taskStatus.isSuccess()) {
zkWorker.resetContinuouslyFailedTasksCount();
if (blackListedWorkers.remove(zkWorker)) {
zkWorker.setBlacklistedUntil(null);
log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker());
}
} else if (taskStatus.isFailure()) {
zkWorker.incrementContinuouslyFailedTasksCount();
}
// Blacklist node if there are too many failures.
synchronized (blackListedWorkers) {
if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
if (blackListedWorkers.add(zkWorker)) {
log.info(
"Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
zkWorker.getWorker(),
zkWorker.getBlacklistedUntil(),
zkWorker.getContinuouslyFailedTasksCount()
);
// Blacklist node if there are too many failures.
synchronized (blackListedWorkers) {
if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() &&
blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
if (blackListedWorkers.add(zkWorker)) {
log.info(
"Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
zkWorker.getWorker(),
zkWorker.getBlacklistedUntil(),
zkWorker.getContinuouslyFailedTasksCount()
);
}
}
}
}
}
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
}
}
@Override

View File

@ -288,7 +288,9 @@ public class TaskQueue
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
final String errorMessage = "Failed while waiting for the task to be ready to run. "
+ "See overlord logs for more details.";
notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage);
continue;
}
if (taskIsReady) {
@ -412,7 +414,7 @@ public class TaskQueue
Preconditions.checkNotNull(taskId, "taskId");
for (final Task task : tasks) {
if (task.getId().equals(taskId)) {
notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
break;
}
}
@ -556,7 +558,9 @@ public class TaskQueue
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.emit();
handleStatus(TaskStatus.failure(task.getId()));
handleStatus(
TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.")
);
}
private void handleStatus(final TaskStatus status)

View File

@ -47,6 +47,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@ -68,6 +69,7 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -424,7 +426,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
config.getTaskAssignmentTimeout()
).emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
taskComplete(
workItem,
workerHolder,
TaskStatus.failure(
taskId,
StringUtils.format(
"The worker that this task is assigned did not start it in timeout[%s]. "
+ "See overlord and middleManager/indexer logs for more details.",
config.getTaskAssignmentTimeout()
)
)
);
}
return true;
@ -456,13 +469,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
}
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
if (taskRunnerWorkItem.getResult().isDone()) {
// This is not the first complete event.
try {
TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode();
if (taskStatus.getStatusCode() != lastKnownState) {
log.warn(
"The state of the new task complete event is different from its last known state. "
+ "New state[%s], last known state[%s]",
taskStatus.getStatusCode(),
lastKnownState
);
}
}
catch (InterruptedException e) {
log.warn(e, "Interrupted while getting the last known task status.");
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
// This case should not really happen.
log.warn(e, "Failed to get the last known task status. Ignoring this failure.");
}
} else {
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus);
// Update success/failure counters, Blacklist node if there are too many failures.
if (workerHolder != null) {
blacklistWorkerIfNeeded(taskStatus, workerHolder);
// Update success/failure counters, Blacklist node if there are too many failures.
if (workerHolder != null) {
blacklistWorkerIfNeeded(taskStatus, workerHolder);
}
}
synchronized (statusLock) {
@ -647,14 +683,26 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
if (!taskItem.getResult().isDone()) {
log.info(
log.warn(
"Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].",
workerHostAndPort,
taskItem.getTaskId(),
config.getTaskCleanupTimeout()
);
// taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
taskComplete(
taskItem,
null,
TaskStatus.failure(
taskItem.getTaskId(),
StringUtils.format(
"The worker that this task was assigned disappeared and "
+ "did not report cleanup within timeout[%s]. "
+ "See overlord and middleManager/indexer logs for more details.",
config.getTaskCleanupTimeout()
)
)
);
}
}
}
@ -1179,7 +1227,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (taskItem.getTask() == null) {
log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, null, TaskStatus.failure(taskId));
taskComplete(
taskItem,
null,
TaskStatus.failure(
taskId,
"No payload found for this task. "
+ "See overlord logs and middleManager/indexer logs for more details."
)
);
continue;
}
@ -1205,7 +1261,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
.emit();
// taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, null, TaskStatus.failure(taskId));
taskComplete(
taskItem,
null,
TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.")
);
}
finally {
synchronized (statusLock) {

View File

@ -390,14 +390,20 @@ public class WorkerHolder
announcement.getStatus(),
worker.getHost()
);
delta.add(TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId()),
announcement.getTaskLocation(),
announcement.getTaskDataSource()
));
delta.add(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(
announcement.getTaskId(),
"This task disappeared on the worker where it was assigned. "
+ "See overlord logs for more details."
),
announcement.getTaskLocation(),
announcement.getTaskDataSource()
)
);
}
}
@ -427,14 +433,20 @@ public class WorkerHolder
announcement.getStatus(),
worker.getHost()
);
delta.add(TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId()),
announcement.getTaskLocation(),
announcement.getTaskDataSource()
));
delta.add(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(
announcement.getTaskId(),
"This task disappeared on the worker where it was assigned. "
+ "See overlord logs for more details."
),
announcement.getTaskLocation(),
announcement.getTaskDataSource()
)
);
}
} else if (change instanceof WorkerHistoryItem.Metadata) {
isWorkerDisabled = ((WorkerHistoryItem.Metadata) change).isDisabled();

View File

@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -54,6 +55,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collection;
@ -88,6 +90,7 @@ public class RemoteTaskRunnerTest
cf = rtrTestUtils.getCuratorFramework();
task = TestTasks.unending("task id with spaces");
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
@After
@ -131,6 +134,7 @@ public class RemoteTaskRunnerTest
{
doSetup();
remoteTaskRunner.addPendingTask(task);
remoteTaskRunner.runPendingTasks();
Assert.assertFalse(workerRunningTask(task.getId()));
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
@ -352,6 +356,8 @@ public class RemoteTaskRunnerTest
TaskStatus status = future.get();
Assert.assertEquals(status.getStatusCode(), TaskState.FAILED);
Assert.assertNotNull(status.getErrorMsg());
Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared"));
}
@Test
@ -446,6 +452,8 @@ public class RemoteTaskRunnerTest
TaskStatus status = future.get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertNotNull(status.getErrorMsg());
Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup"));
RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
Assert.assertTrue(
TestUtils.conditionValid(
@ -517,6 +525,38 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
@Test
public void testRunPendingTaskFailToAssignTask() throws Exception
{
doSetup();
RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task);
// modify taskId to make task assignment failed
RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class);
Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId");
remoteTaskRunner.runPendingTask(wankyItem);
TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS);
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertEquals(
"Failed to assign this task. See overlord logs for more details.",
taskStatus.getErrorMsg()
);
}
@Test
public void testRunPendingTaskTimeoutToAssign() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task);
remoteTaskRunner.runPendingTask(workItem);
TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS);
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
Assert.assertNotNull(taskStatus.getErrorMsg());
Assert.assertTrue(
taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
);
}
private void doSetup() throws Exception
{
makeWorker();

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@ -41,6 +42,7 @@ import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.metadata.EntryExistsException;
@ -146,8 +148,14 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(task.interval, locksForTask.get(0).getInterval());
// Verify that locks are removed on calling shutdown
taskQueue.shutdown(task.getId(), "Shutdown Task");
taskQueue.shutdown(task.getId(), "Shutdown Task test");
Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty());
Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
Assert.assertTrue(statusOptional.isPresent());
Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
Assert.assertNotNull(statusOptional.get().getErrorMsg());
Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg());
}
@Test
@ -305,6 +313,42 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
}
@Test
public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
actionClientFactory,
getLockbox(),
new NoopServiceEmitter()
);
taskQueue.setActive(true);
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
{
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
throw new RuntimeException("isReady failure test");
}
};
taskQueue.add(task);
taskQueue.manageInternal();
Optional<TaskStatus> statusOptional = getTaskStorage().getStatus(task.getId());
Assert.assertTrue(statusOptional.isPresent());
Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode());
Assert.assertNotNull(statusOptional.get().getErrorMsg());
Assert.assertTrue(
StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()),
statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run")
);
}
private static class TestTask extends AbstractBatchIndexTask
{
private final Interval interval;

View File

@ -49,16 +49,20 @@ import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@ -79,6 +83,12 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class HttpRemoteTaskRunnerTest
{
@Before
public void setup()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
}
/*
Simulates startup of Overlord and Workers being discovered with no previously known tasks. Fresh tasks are given
and expected to be completed.
@ -733,6 +743,18 @@ public class HttpRemoteTaskRunnerTest
Assert.assertTrue(future1.get().isFailure());
Assert.assertTrue(future2.get().isFailure());
Assert.assertNotNull(future1.get().getErrorMsg());
Assert.assertNotNull(future2.get().getErrorMsg());
Assert.assertTrue(
future1.get().getErrorMsg().startsWith(
"The worker that this task was assigned disappeared and did not report cleanup within timeout"
)
);
Assert.assertTrue(
future2.get().getErrorMsg().startsWith(
"The worker that this task was assigned disappeared and did not report cleanup within timeout"
)
);
AtomicInteger ticks = new AtomicInteger();
Set<String> actualShutdowns = new ConcurrentHashSet<>();
@ -1254,6 +1276,230 @@ public class HttpRemoteTaskRunnerTest
);
}
@Test
public void testTimeoutInAssigningTasks() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 1;
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period("PT1S");
}
},
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new NoopProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return new WorkerHolder(
smileMapper,
httpClient,
config,
workersSyncExec,
listener,
worker,
ImmutableList.of()
)
{
@Override
public void start()
{
disabled.set(false);
}
@Override
public void stop()
{
}
@Override
public boolean isInitialized()
{
return true;
}
@Override
public void waitForInitialization()
{
}
@Override
public boolean assignTask(Task task)
{
// Always returns true
return true;
}
@Override
public void shutdownTask(String taskId)
{
}
};
}
};
taskRunner.start();
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
Assert.assertTrue(future.get().isFailure());
Assert.assertNotNull(future.get().getErrorMsg());
Assert.assertTrue(
future.get().getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout")
);
}
@Test
public void testExceptionThrownInAssigningTasks() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 1;
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period("PT1S");
}
},
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new NoopProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return new WorkerHolder(
smileMapper,
httpClient,
config,
workersSyncExec,
listener,
worker,
ImmutableList.of()
)
{
@Override
public void start()
{
disabled.set(false);
}
@Override
public void stop()
{
}
@Override
public boolean isInitialized()
{
return true;
}
@Override
public void waitForInitialization()
{
}
@Override
public boolean assignTask(Task task)
{
throw new RuntimeException("Assign failure test");
}
@Override
public void shutdownTask(String taskId)
{
}
};
}
};
taskRunner.start();
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
);
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
Assert.assertTrue(future.get().isFailure());
Assert.assertNotNull(future.get().getErrorMsg());
Assert.assertTrue(
StringUtils.format("Actual message is: %s", future.get().getErrorMsg()),
future.get().getErrorMsg().startsWith("Failed to assign this task")
);
}
private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator

View File

@ -115,6 +115,12 @@ public class WorkerHolderTest
Assert.assertEquals(task0.getId(), updates.get(3).getTaskId());
Assert.assertTrue(updates.get(3).getTaskStatus().isFailure());
Assert.assertNotNull(updates.get(3).getTaskStatus().getErrorMsg());
Assert.assertTrue(
updates.get(3).getTaskStatus().getErrorMsg().startsWith(
"This task disappeared on the worker where it was assigned"
)
);
updates.clear();
@ -138,6 +144,12 @@ public class WorkerHolderTest
Assert.assertEquals(task2.getId(), updates.get(0).getTaskId());
Assert.assertTrue(updates.get(0).getTaskStatus().isFailure());
Assert.assertNotNull(updates.get(0).getTaskStatus().getErrorMsg());
Assert.assertTrue(
updates.get(0).getTaskStatus().getErrorMsg().startsWith(
"This task disappeared on the worker where it was assigned"
)
);
Assert.assertEquals(task3.getId(), updates.get(1).getTaskId());
Assert.assertTrue(updates.get(1).getTaskStatus().isRunnable());