When a task fails and doesn't throw an exception, report it correctly… (#13668)

* When a task fails and doesn't throw an exception, report it correctly in mm-less druid

* Removing unthrown exception from test
This commit is contained in:
Churro 2023-02-02 09:04:18 -08:00 committed by GitHub
parent 6cb842e76e
commit f022a9f246
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 3 deletions

View File

@ -28,6 +28,8 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import java.util.Objects;
public class UpdateStatusAction implements TaskAction<Void>
{
@JsonIgnore
@ -82,4 +84,23 @@ public class UpdateStatusAction implements TaskAction<Void>
"status=" + status +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UpdateStatusAction that = (UpdateStatusAction) o;
return Objects.equals(status, that.status);
}
@Override
public int hashCode()
{
return Objects.hash(status);
}
}

View File

@ -166,7 +166,11 @@ public abstract class AbstractTask implements Task
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
}
return runTask(taskToolbox);
TaskStatus taskStatus = runTask(taskToolbox);
if (taskStatus.isFailure()) {
failure = true;
}
return taskStatus;
}
catch (Exception e) {
failure = true;

View File

@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.UpdateStatusAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
@ -73,7 +75,8 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
{
@Nullable
@Override
public String setup(TaskToolbox toolbox) throws Exception
@ -116,7 +119,8 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
{
@Nullable
@Override
public String setup(TaskToolbox toolbox) throws Exception
@ -136,6 +140,41 @@ public class AbstractTaskTest
verify(pusher, never()).pushTaskReports(eq("myID"), any());
}
@Test
public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Exception
{
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");
DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
when(toolbox.getTaskExecutorNode()).thenReturn(node);
TaskLogPusher pusher = mock(TaskLogPusher.class);
when(toolbox.getTaskLogPusher()).thenReturn(pusher);
TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
return TaskStatus.failure("myId", "failed");
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("failure");
verify(taskActionClient).submit(eq(action));
}
@Test
public void testBatchIOConfigAppend()
{