diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java index 38dc47d651e..55199ac9d0d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java @@ -28,6 +28,8 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunner; +import java.util.Objects; + public class UpdateStatusAction implements TaskAction { @JsonIgnore @@ -82,4 +84,23 @@ public class UpdateStatusAction implements TaskAction "status=" + status + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UpdateStatusAction that = (UpdateStatusAction) o; + return Objects.equals(status, that.status); + } + + @Override + public int hashCode() + { + return Objects.hash(status); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index e76873e9789..72bff3784b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -166,7 +166,11 @@ public abstract class AbstractTask implements Task if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) { return TaskStatus.failure(getId(), errorMessage); } - return runTask(taskToolbox); + TaskStatus taskStatus = runTask(taskToolbox); + if (taskStatus.isFailure()) { + failure = true; + } + return taskStatus; } catch (Exception e) { failure = true; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index fa375a9c8f8..9210006a9ae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.common.task; import org.apache.commons.io.FileUtils; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.UpdateStatusAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.server.DruidNode; import org.apache.druid.tasklogs.TaskLogPusher; @@ -73,7 +75,8 @@ public class AbstractTaskTest when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); - AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) { + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) + { @Nullable @Override public String setup(TaskToolbox toolbox) throws Exception @@ -116,7 +119,8 @@ public class AbstractTaskTest when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); - AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) { + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) + { @Nullable @Override public String setup(TaskToolbox toolbox) throws Exception @@ -136,6 +140,41 @@ public class AbstractTaskTest verify(pusher, never()).pushTaskReports(eq("myID"), any()); } + @Test + public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Exception + { + TaskToolbox toolbox = mock(TaskToolbox.class); + when(toolbox.getAttemptId()).thenReturn("1"); + + DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true); + when(toolbox.getTaskExecutorNode()).thenReturn(node); + + TaskLogPusher pusher = mock(TaskLogPusher.class); + when(toolbox.getTaskLogPusher()).thenReturn(pusher); + + TaskConfig config = mock(TaskConfig.class); + when(config.isEncapsulatedTask()).thenReturn(true); + File folder = temporaryFolder.newFolder(); + when(config.getTaskDir(eq("myID"))).thenReturn(folder); + when(toolbox.getConfig()).thenReturn(config); + + TaskActionClient taskActionClient = mock(TaskActionClient.class); + when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); + when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); + + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) + { + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { + return TaskStatus.failure("myId", "failed"); + } + }; + task.run(toolbox); + UpdateStatusAction action = new UpdateStatusAction("failure"); + verify(taskActionClient).submit(eq(action)); + } + @Test public void testBatchIOConfigAppend() {