diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 6bef27ee214..00d072c0089 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -47,11 +47,21 @@ public class TaskStatus return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null); } + /** + * The succeeded task status should not have error messages. + * Use {@link #success(String)} instead. + */ + @Deprecated public static TaskStatus success(String taskId, String errorMsg) { return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null); } + /** + * All failed task status must have a non-null error message. + * Use {@link #failure(String, String)} instead. + */ + @Deprecated public static TaskStatus failure(String taskId) { return new TaskStatus(taskId, TaskState.FAILED, -1, null, null); @@ -62,6 +72,11 @@ public class TaskStatus return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null); } + /** + * This method is deprecated because it does not handle the error message of failed task status properly. + * Use {@link #success(String)} or {@link #failure(String, String)} instead. + */ + @Deprecated public static TaskStatus fromCode(String taskId, TaskState code) { return new TaskStatus(taskId, code, -1, null, null); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 6fd3f7fa328..de0713bc506 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -52,6 +52,7 @@ import org.apache.druid.curator.CuratorUtils; import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; @@ -108,6 +109,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes. @@ -540,7 +542,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } else if ((completeTask = completeTasks.get(task.getId())) != null) { return completeTask.getResult(); } else { - return addPendingTask(task).getResult(); + RemoteTaskRunnerWorkItem workItem = addPendingTask(task); + runPendingTasks(); + return workItem.getResult(); } } @@ -681,7 +685,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } /** - * Adds a task to the pending queue + * Adds a task to the pending queue. + * {@link #runPendingTasks()} should be called to run the pending task. */ @VisibleForTesting RemoteTaskRunnerWorkItem addPendingTask(final Task task) @@ -696,7 +701,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); pendingTaskPayloads.put(task.getId(), task); pendingTasks.put(task.getId(), taskRunnerWorkItem); - runPendingTasks(); return taskRunnerWorkItem; } @@ -705,7 +709,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer * are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe. * This method should be run each time there is new worker capacity or if new tasks are assigned. */ - private void runPendingTasks() + @VisibleForTesting + void runPendingTasks() { runPendingTasksExec.submit( (Callable) () -> { @@ -716,30 +721,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer sortByInsertionTime(copy); for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) { - String taskId = taskRunnerWorkItem.getTaskId(); - if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) { - try { - //this can still be null due to race from explicit task shutdown request - //or if another thread steals and completes this task right after this thread makes copy - //of pending tasks. See https://github.com/apache/druid/issues/2842 . - Task task = pendingTaskPayloads.get(taskId); - if (task != null && tryAssignTask(task, taskRunnerWorkItem)) { - pendingTaskPayloads.remove(taskId); - } - } - catch (Exception e) { - log.makeAlert(e, "Exception while trying to assign task") - .addData("taskId", taskRunnerWorkItem.getTaskId()) - .emit(); - RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId); - if (workItem != null) { - taskComplete(workItem, null, TaskStatus.failure(taskId)); - } - } - finally { - tryAssignTasks.remove(taskId); - } - } + runPendingTask(taskRunnerWorkItem); } } catch (Exception e) { @@ -751,6 +733,45 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); } + /** + * Run one pending task. This method must be called in the same class except for unit tests. + */ + @VisibleForTesting + void runPendingTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem) + { + String taskId = taskRunnerWorkItem.getTaskId(); + if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) { + try { + //this can still be null due to race from explicit task shutdown request + //or if another thread steals and completes this task right after this thread makes copy + //of pending tasks. See https://github.com/apache/druid/issues/2842 . + Task task = pendingTaskPayloads.get(taskId); + if (task != null && tryAssignTask(task, taskRunnerWorkItem)) { + pendingTaskPayloads.remove(taskId); + } + } + catch (Exception e) { + log.makeAlert(e, "Exception while trying to assign task") + .addData("taskId", taskRunnerWorkItem.getTaskId()) + .emit(); + RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId); + if (workItem != null) { + taskComplete( + workItem, + null, + TaskStatus.failure( + taskId, + StringUtils.format("Failed to assign this task. See overlord logs for more details.") + ) + ); + } + } + finally { + tryAssignTasks.remove(taskId); + } + } + } + @VisibleForTesting static void sortByInsertionTime(List tasks) { @@ -930,7 +951,18 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer elapsed, config.getTaskAssignmentTimeout() ).emit(); - taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId())); + taskComplete( + taskRunnerWorkItem, + theZkWorker, + TaskStatus.failure( + task.getId(), + StringUtils.format( + "The worker that this task is assigned did not start it in timeout[%s]. " + + "See overlord logs for more details.", + config.getTaskAssignmentTimeout() + ) + ) + ); break; } } @@ -1066,9 +1098,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); taskRunnerWorkItem = runningTasks.remove(taskId); if (taskRunnerWorkItem != null) { - log.info("Task[%s] just disappeared!", taskId); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskId)); - TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId)); + log.warn("Task[%s] just disappeared!", taskId); + final TaskStatus taskStatus = TaskStatus.failure( + taskId, + "The worker that this task was assigned disappeared. See overlord logs for more details." + ); + taskRunnerWorkItem.setResult(taskStatus); + TaskRunnerUtils.notifyStatusChanged(listeners, taskId, taskStatus); } else { log.info("Task[%s] went bye bye.", taskId); } @@ -1189,8 +1225,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer log.info("Failing task[%s]", assignedTask); RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); if (taskRunnerWorkItem != null) { - taskRunnerWorkItem.setResult(TaskStatus.failure(assignedTask)); - TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, TaskStatus.failure(assignedTask)); + final TaskStatus taskStatus = TaskStatus.failure( + assignedTask, + StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.") + ); + taskRunnerWorkItem.setResult(taskStatus); + TaskRunnerUtils.notifyStatusChanged(listeners, assignedTask, taskStatus); } else { log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); } @@ -1235,7 +1275,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private void taskComplete( RemoteTaskRunnerWorkItem taskRunnerWorkItem, - ZkWorker zkWorker, + @Nullable ZkWorker zkWorker, TaskStatus taskStatus ) { @@ -1255,41 +1295,76 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } // Move from running -> complete - completeTasks.put(taskStatus.getId(), taskRunnerWorkItem); - runningTasks.remove(taskStatus.getId()); + // If the task was running and this is the first complete event, + // previousComplete should be null and removedRunning should not. + final RemoteTaskRunnerWorkItem previousComplete = completeTasks.put(taskStatus.getId(), taskRunnerWorkItem); + final RemoteTaskRunnerWorkItem removedRunning = runningTasks.remove(taskStatus.getId()); - // Update success/failure counters - if (zkWorker != null) { - if (taskStatus.isSuccess()) { - zkWorker.resetContinuouslyFailedTasksCount(); - if (blackListedWorkers.remove(zkWorker)) { - zkWorker.setBlacklistedUntil(null); - log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker()); + if (previousComplete != null && removedRunning != null) { + log.warn( + "This is not the first complete event for task[%s], but it was still known as running. " + + "Ignoring the previously known running status.", + taskStatus.getId() + ); + } + + if (previousComplete != null) { + // This is not the first complete event for the same task. + try { + // getResult().get() must return immediately. + TaskState lastKnownState = previousComplete.getResult().get(1, TimeUnit.MILLISECONDS).getStatusCode(); + if (taskStatus.getStatusCode() != lastKnownState) { + log.warn( + "The state of the new task complete event is different from its last known state. " + + "New state[%s], last known state[%s]", + taskStatus.getStatusCode(), + lastKnownState + ); } - } else if (taskStatus.isFailure()) { - zkWorker.incrementContinuouslyFailedTasksCount(); } + catch (InterruptedException e) { + log.warn(e, "Interrupted while getting the last known task status."); + Thread.currentThread().interrupt(); + } + catch (ExecutionException | TimeoutException e) { + // This case should not really happen. + log.warn(e, "Failed to get the last known task status. Ignoring this failure."); + } + } else { + // This is the first complete event for this task. + // Update success/failure counters + if (zkWorker != null) { + if (taskStatus.isSuccess()) { + zkWorker.resetContinuouslyFailedTasksCount(); + if (blackListedWorkers.remove(zkWorker)) { + zkWorker.setBlacklistedUntil(null); + log.info("[%s] removed from blacklist because a task finished with SUCCESS", zkWorker.getWorker()); + } + } else if (taskStatus.isFailure()) { + zkWorker.incrementContinuouslyFailedTasksCount(); + } - // Blacklist node if there are too many failures. - synchronized (blackListedWorkers) { - if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && - blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { - zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime())); - if (blackListedWorkers.add(zkWorker)) { - log.info( - "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", - zkWorker.getWorker(), - zkWorker.getBlacklistedUntil(), - zkWorker.getContinuouslyFailedTasksCount() - ); + // Blacklist node if there are too many failures. + synchronized (blackListedWorkers) { + if (zkWorker.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && + blackListedWorkers.size() <= zkWorkers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { + zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime())); + if (blackListedWorkers.add(zkWorker)) { + log.info( + "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", + zkWorker.getWorker(), + zkWorker.getBlacklistedUntil(), + zkWorker.getContinuouslyFailedTasksCount() + ); + } } } } - } - // Notify interested parties - taskRunnerWorkItem.setResult(taskStatus); - TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); + // Notify interested parties + taskRunnerWorkItem.setResult(taskStatus); + TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); + } } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index d341663fc91..d9e582ced71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -288,7 +288,9 @@ public class TaskQueue } catch (Exception e) { log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); - notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass()); + final String errorMessage = "Failed while waiting for the task to be ready to run. " + + "See overlord logs for more details."; + notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); continue; } if (taskIsReady) { @@ -412,7 +414,7 @@ public class TaskQueue Preconditions.checkNotNull(taskId, "taskId"); for (final Task task : tasks) { if (task.getId().equals(taskId)) { - notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args); + notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); break; } } @@ -556,7 +558,9 @@ public class TaskQueue .addData("type", task.getType()) .addData("dataSource", task.getDataSource()) .emit(); - handleStatus(TaskStatus.failure(task.getId())); + handleStatus( + TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.") + ); } private void handleStatus(final TaskStatus status) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 623458fc1cc..59085a27afe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -47,6 +47,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; @@ -68,6 +69,7 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -424,7 +426,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer config.getTaskAssignmentTimeout() ).emit(); // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete(workItem, workerHolder, TaskStatus.failure(taskId)); + taskComplete( + workItem, + workerHolder, + TaskStatus.failure( + taskId, + StringUtils.format( + "The worker that this task is assigned did not start it in timeout[%s]. " + + "See overlord and middleManager/indexer logs for more details.", + config.getTaskAssignmentTimeout() + ) + ) + ); } return true; @@ -456,13 +469,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc()); } - // Notify interested parties - taskRunnerWorkItem.setResult(taskStatus); - TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); + if (taskRunnerWorkItem.getResult().isDone()) { + // This is not the first complete event. + try { + TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode(); + if (taskStatus.getStatusCode() != lastKnownState) { + log.warn( + "The state of the new task complete event is different from its last known state. " + + "New state[%s], last known state[%s]", + taskStatus.getStatusCode(), + lastKnownState + ); + } + } + catch (InterruptedException e) { + log.warn(e, "Interrupted while getting the last known task status."); + Thread.currentThread().interrupt(); + } + catch (ExecutionException e) { + // This case should not really happen. + log.warn(e, "Failed to get the last known task status. Ignoring this failure."); + } + } else { + // Notify interested parties + taskRunnerWorkItem.setResult(taskStatus); + TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); - // Update success/failure counters, Blacklist node if there are too many failures. - if (workerHolder != null) { - blacklistWorkerIfNeeded(taskStatus, workerHolder); + // Update success/failure counters, Blacklist node if there are too many failures. + if (workerHolder != null) { + blacklistWorkerIfNeeded(taskStatus, workerHolder); + } } synchronized (statusLock) { @@ -647,14 +683,26 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) { if (!taskItem.getResult().isDone()) { - log.info( + log.warn( "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].", workerHostAndPort, taskItem.getTaskId(), config.getTaskCleanupTimeout() ); // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId())); + taskComplete( + taskItem, + null, + TaskStatus.failure( + taskItem.getTaskId(), + StringUtils.format( + "The worker that this task was assigned disappeared and " + + "did not report cleanup within timeout[%s]. " + + "See overlord and middleManager/indexer logs for more details.", + config.getTaskCleanupTimeout() + ) + ) + ); } } } @@ -1179,7 +1227,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (taskItem.getTask() == null) { log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit(); // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete(taskItem, null, TaskStatus.failure(taskId)); + taskComplete( + taskItem, + null, + TaskStatus.failure( + taskId, + "No payload found for this task. " + + "See overlord logs and middleManager/indexer logs for more details." + ) + ); continue; } @@ -1205,7 +1261,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer .emit(); // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete(taskItem, null, TaskStatus.failure(taskId)); + taskComplete( + taskItem, + null, + TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.") + ); } finally { synchronized (statusLock) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 630b0f03220..ff9aab1bfe4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -390,14 +390,20 @@ public class WorkerHolder announcement.getStatus(), worker.getHost() ); - delta.add(TaskAnnouncement.create( - announcement.getTaskId(), - announcement.getTaskType(), - announcement.getTaskResource(), - TaskStatus.failure(announcement.getTaskId()), - announcement.getTaskLocation(), - announcement.getTaskDataSource() - )); + delta.add( + TaskAnnouncement.create( + announcement.getTaskId(), + announcement.getTaskType(), + announcement.getTaskResource(), + TaskStatus.failure( + announcement.getTaskId(), + "This task disappeared on the worker where it was assigned. " + + "See overlord logs for more details." + ), + announcement.getTaskLocation(), + announcement.getTaskDataSource() + ) + ); } } @@ -427,14 +433,20 @@ public class WorkerHolder announcement.getStatus(), worker.getHost() ); - delta.add(TaskAnnouncement.create( - announcement.getTaskId(), - announcement.getTaskType(), - announcement.getTaskResource(), - TaskStatus.failure(announcement.getTaskId()), - announcement.getTaskLocation(), - announcement.getTaskDataSource() - )); + delta.add( + TaskAnnouncement.create( + announcement.getTaskId(), + announcement.getTaskType(), + announcement.getTaskResource(), + TaskStatus.failure( + announcement.getTaskId(), + "This task disappeared on the worker where it was assigned. " + + "See overlord logs for more details." + ), + announcement.getTaskLocation(), + announcement.getTaskDataSource() + ) + ); } } else if (change instanceof WorkerHistoryItem.Metadata) { isWorkerDisabled = ((WorkerHistoryItem.Metadata) change).isDisabled(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index e7bf0226ba1..589c6e1f62f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.DeadlockDetectingTimeout; import org.easymock.Capture; import org.easymock.EasyMock; @@ -54,6 +55,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Collection; @@ -88,6 +90,7 @@ public class RemoteTaskRunnerTest cf = rtrTestUtils.getCuratorFramework(); task = TestTasks.unending("task id with spaces"); + EmittingLogger.registerEmitter(new NoopServiceEmitter()); } @After @@ -131,6 +134,7 @@ public class RemoteTaskRunnerTest { doSetup(); remoteTaskRunner.addPendingTask(task); + remoteTaskRunner.runPendingTasks(); Assert.assertFalse(workerRunningTask(task.getId())); ListenableFuture result = remoteTaskRunner.run(task); @@ -352,6 +356,8 @@ public class RemoteTaskRunnerTest TaskStatus status = future.get(); Assert.assertEquals(status.getStatusCode(), TaskState.FAILED); + Assert.assertNotNull(status.getErrorMsg()); + Assert.assertTrue(status.getErrorMsg().contains("The worker that this task was assigned disappeared")); } @Test @@ -446,6 +452,8 @@ public class RemoteTaskRunnerTest TaskStatus status = future.get(); Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); + Assert.assertNotNull(status.getErrorMsg()); + Assert.assertTrue(status.getErrorMsg().contains("Canceled for worker cleanup")); RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig(); Assert.assertTrue( TestUtils.conditionValid( @@ -517,6 +525,38 @@ public class RemoteTaskRunnerTest Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode()); } + @Test + public void testRunPendingTaskFailToAssignTask() throws Exception + { + doSetup(); + RemoteTaskRunnerWorkItem originalItem = remoteTaskRunner.addPendingTask(task); + // modify taskId to make task assignment failed + RemoteTaskRunnerWorkItem wankyItem = Mockito.mock(RemoteTaskRunnerWorkItem.class); + Mockito.when(wankyItem.getTaskId()).thenReturn(originalItem.getTaskId()).thenReturn("wrongId"); + remoteTaskRunner.runPendingTask(wankyItem); + TaskStatus taskStatus = originalItem.getResult().get(0, TimeUnit.MILLISECONDS); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + Assert.assertEquals( + "Failed to assign this task. See overlord logs for more details.", + taskStatus.getErrorMsg() + ); + } + + @Test + public void testRunPendingTaskTimeoutToAssign() throws Exception + { + makeWorker(); + makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S"))); + RemoteTaskRunnerWorkItem workItem = remoteTaskRunner.addPendingTask(task); + remoteTaskRunner.runPendingTask(workItem); + TaskStatus taskStatus = workItem.getResult().get(0, TimeUnit.MILLISECONDS); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + Assert.assertNotNull(taskStatus.getErrorMsg()); + Assert.assertTrue( + taskStatus.getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout") + ); + } + private void doSetup() throws Exception { makeWorker(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 2dbd9d1d38a..0b3d8728a61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -41,6 +42,7 @@ import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.metadata.EntryExistsException; @@ -146,8 +148,14 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals(task.interval, locksForTask.get(0).getInterval()); // Verify that locks are removed on calling shutdown - taskQueue.shutdown(task.getId(), "Shutdown Task"); + taskQueue.shutdown(task.getId(), "Shutdown Task test"); Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty()); + + Optional statusOptional = getTaskStorage().getStatus(task.getId()); + Assert.assertTrue(statusOptional.isPresent()); + Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); + Assert.assertNotNull(statusOptional.get().getErrorMsg()); + Assert.assertEquals("Shutdown Task test", statusOptional.get().getErrorMsg()); } @Test @@ -305,6 +313,42 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertFalse(queuedTask.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY)); } + @Test + public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException + { + final TaskActionClientFactory actionClientFactory = createActionClientFactory(); + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + new SimpleTaskRunner(actionClientFactory), + actionClientFactory, + getLockbox(), + new NoopServiceEmitter() + ); + taskQueue.setActive(true); + final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) + { + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + throw new RuntimeException("isReady failure test"); + } + }; + taskQueue.add(task); + taskQueue.manageInternal(); + + Optional statusOptional = getTaskStorage().getStatus(task.getId()); + Assert.assertTrue(statusOptional.isPresent()); + Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); + Assert.assertNotNull(statusOptional.get().getErrorMsg()); + Assert.assertTrue( + StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()), + statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run") + ); + } + private static class TestTask extends AbstractBatchIndexTask { private final Interval interval; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 2ca1dff0f28..aa3f14e070e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -49,16 +49,20 @@ import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.ZkPathsConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.eclipse.jetty.util.ConcurrentHashSet; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -79,6 +83,12 @@ import java.util.concurrent.atomic.AtomicReference; */ public class HttpRemoteTaskRunnerTest { + @Before + public void setup() + { + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + } + /* Simulates startup of Overlord and Workers being discovered with no previously known tasks. Fresh tasks are given and expected to be completed. @@ -733,6 +743,18 @@ public class HttpRemoteTaskRunnerTest Assert.assertTrue(future1.get().isFailure()); Assert.assertTrue(future2.get().isFailure()); + Assert.assertNotNull(future1.get().getErrorMsg()); + Assert.assertNotNull(future2.get().getErrorMsg()); + Assert.assertTrue( + future1.get().getErrorMsg().startsWith( + "The worker that this task was assigned disappeared and did not report cleanup within timeout" + ) + ); + Assert.assertTrue( + future2.get().getErrorMsg().startsWith( + "The worker that this task was assigned disappeared and did not report cleanup within timeout" + ) + ); AtomicInteger ticks = new AtomicInteger(); Set actualShutdowns = new ConcurrentHashSet<>(); @@ -1254,6 +1276,230 @@ public class HttpRemoteTaskRunnerTest ); } + @Test + public void testTimeoutInAssigningTasks() throws Exception + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig() + { + @Override + public int getPendingTasksRunnerNumThreads() + { + return 1; + } + + @Override + public Period getTaskAssignmentTimeout() + { + return new Period("PT1S"); + } + }, + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + new NoopProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createNiceMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null) + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + return new WorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + listener, + worker, + ImmutableList.of() + ) + { + @Override + public void start() + { + disabled.set(false); + } + + @Override + public void stop() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + + @Override + public void waitForInitialization() + { + } + + @Override + public boolean assignTask(Task task) + { + // Always returns true + return true; + } + + @Override + public void shutdownTask(String taskId) + { + } + }; + } + }; + + taskRunner.start(); + + DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( + new DruidNode("service", "host1", false, 8080, null, true, false), + NodeRole.MIDDLE_MANAGER, + ImmutableMap.of( + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) + ) + ); + + druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); + + Future future = taskRunner.run(NoopTask.create("task-id", 0)); + Assert.assertTrue(future.get().isFailure()); + Assert.assertNotNull(future.get().getErrorMsg()); + Assert.assertTrue( + future.get().getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout") + ); + } + + @Test + public void testExceptionThrownInAssigningTasks() throws Exception + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig() + { + @Override + public int getPendingTasksRunnerNumThreads() + { + return 1; + } + + @Override + public Period getTaskAssignmentTimeout() + { + return new Period("PT1S"); + } + }, + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + new NoopProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createNiceMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null) + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + return new WorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + listener, + worker, + ImmutableList.of() + ) + { + @Override + public void start() + { + disabled.set(false); + } + + @Override + public void stop() + { + } + + @Override + public boolean isInitialized() + { + return true; + } + + @Override + public void waitForInitialization() + { + } + + @Override + public boolean assignTask(Task task) + { + throw new RuntimeException("Assign failure test"); + } + + @Override + public void shutdownTask(String taskId) + { + } + }; + } + }; + + taskRunner.start(); + + DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( + new DruidNode("service", "host1", false, 8080, null, true, false), + NodeRole.MIDDLE_MANAGER, + ImmutableMap.of( + WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY) + ) + ); + + druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); + + Future future = taskRunner.run(NoopTask.create("task-id", 0)); + Assert.assertTrue(future.get().isFailure()); + Assert.assertNotNull(future.get().getErrorMsg()); + Assert.assertTrue( + StringUtils.format("Actual message is: %s", future.get().getErrorMsg()), + future.get().getErrorMsg().startsWith("Failed to assign this task") + ); + } + private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( TaskStorage taskStorage, List listenerNotificationsAccumulator diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java index 3319d812571..5f4a9b843a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java @@ -115,6 +115,12 @@ public class WorkerHolderTest Assert.assertEquals(task0.getId(), updates.get(3).getTaskId()); Assert.assertTrue(updates.get(3).getTaskStatus().isFailure()); + Assert.assertNotNull(updates.get(3).getTaskStatus().getErrorMsg()); + Assert.assertTrue( + updates.get(3).getTaskStatus().getErrorMsg().startsWith( + "This task disappeared on the worker where it was assigned" + ) + ); updates.clear(); @@ -138,6 +144,12 @@ public class WorkerHolderTest Assert.assertEquals(task2.getId(), updates.get(0).getTaskId()); Assert.assertTrue(updates.get(0).getTaskStatus().isFailure()); + Assert.assertNotNull(updates.get(0).getTaskStatus().getErrorMsg()); + Assert.assertTrue( + updates.get(0).getTaskStatus().getErrorMsg().startsWith( + "This task disappeared on the worker where it was assigned" + ) + ); Assert.assertEquals(task3.getId(), updates.get(1).getTaskId()); Assert.assertTrue(updates.get(1).getTaskStatus().isRunnable());