Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons (#11446)

* Add error message; add unit tests for ForkingTaskRunner

* add tests

* fix comment

* unused import

* add exit code in error message

* fix test
This commit is contained in:
Jihoon Son 2021-07-20 21:34:53 -07:00 committed by GitHub
parent 84c957f541
commit 0453e461f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 569 additions and 52 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@ -343,14 +344,11 @@ public class ForkingTaskRunner
jsonMapper.writeValue(taskFile, task);
}
LOGGER.info("Running command: %s", getMaskedCommand(startupLoggingConfig.getMaskProperties(), command));
taskWorkItem.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
taskLocation.getHost(),
taskLocation.getPort(),
taskLocation.getTlsPort()
LOGGER.info(
"Running command: %s",
getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
);
taskWorkItem.processHolder = runTaskProcess(command, logFile, taskLocation);
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
@ -364,38 +362,23 @@ public class ForkingTaskRunner
);
LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
boolean runFailed = true;
final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
// This will block for a while. So we append the thread information with more details
final String priorThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
try (final OutputStream toLogfile = logSink.openStream()) {
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
final int statusCode = processHolder.process.waitFor();
LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
if (statusCode == 0) {
runFailed = false;
}
}
finally {
Thread.currentThread().setName(priorThreadName);
// Upload task logs
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
}
}
TaskStatus status;
if (!runFailed) {
final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
final TaskStatus status;
if (exitCode == 0) {
LOGGER.info("Process exited successfully for task: %s", task.getId());
// Process exited successfully
status = jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
// Process exited unsuccessfully
status = TaskStatus.failure(task.getId());
status = TaskStatus.failure(
task.getId(),
StringUtils.format(
"Task execution process exited unsuccessfully with code[%s]. "
+ "See middleManager logs for more details.",
exitCode
)
);
}
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
@ -417,7 +400,7 @@ public class ForkingTaskRunner
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.process.destroy();
taskWorkItem.processHolder.shutdown();
}
if (!stopping) {
saveRunningTasks();
@ -458,6 +441,42 @@ public class ForkingTaskRunner
}
}
@VisibleForTesting
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
{
return new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
taskLocation.getHost(),
taskLocation.getPort(),
taskLocation.getTlsPort()
);
}
@VisibleForTesting
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
throws IOException, InterruptedException
{
final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
// This will block for a while. So we append the thread information with more details
final String priorThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
try (final OutputStream toLogfile = logSink.openStream()) {
ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
return processHolder.process.waitFor();
}
finally {
Thread.currentThread().setName(priorThreadName);
// Upload task logs
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
}
}
}
@Override
@LifecycleStop
public void stop()
@ -726,7 +745,8 @@ public class ForkingTaskRunner
}
}
private static class ProcessHolder
@VisibleForTesting
static class ProcessHolder
{
private final Process process;
private final File logFile;
@ -743,11 +763,18 @@ public class ForkingTaskRunner
this.tlsPort = tlsPort;
}
private void registerWithCloser(Closer closer)
@VisibleForTesting
void registerWithCloser(Closer closer)
{
closer.register(process.getInputStream());
closer.register(process.getOutputStream());
}
@VisibleForTesting
void shutdown()
{
process.destroy();
}
}
}

View File

@ -208,10 +208,31 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
.emit();
log.warn(e, "Graceful shutdown of task[%s] aborted with exception.", task.getId());
error = true;
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
// Creating a new status to only feed listeners seems quite strange.
// This is currently OK because we have no listeners yet registered in peon.
// However, we should fix this in the near future by always retrieving task status
// from one single source of truth that is also propagated to the overlord.
// See https://github.com/apache/druid/issues/11445.
TaskRunnerUtils.notifyStatusChanged(
listeners,
task.getId(),
TaskStatus.failure(
task.getId(),
"Failed to stop gracefully with exception. See task logs for more details."
)
);
}
} else {
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
// Creating a new status to only feed listeners seems quite strange.
// This is currently OK because we have no listeners yet registered in peon.
// However, we should fix this in the near future by always retrieving task status
// from one single source of truth that is also propagated to the overlord.
// See https://github.com/apache/druid/issues/11445.
TaskRunnerUtils.notifyStatusChanged(
listeners,
task.getId(),
TaskStatus.failure(task.getId(), "Canceled as task execution process stopped")
);
}
elapsed = System.currentTimeMillis() - start;
@ -417,7 +438,6 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
{
return task.getDataSource();
}
}
private class SingleTaskBackgroundRunnerCallable implements Callable<TaskStatus>

View File

@ -197,7 +197,7 @@ public class ThreadingTaskRunner
Thread.currentThread()
.setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));
TaskStatus taskStatus = null;
TaskStatus taskStatus;
final TaskToolbox toolbox = toolboxFactory.build(task);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
TaskRunnerUtils.notifyStatusChanged(
@ -212,12 +212,13 @@ public class ThreadingTaskRunner
}
catch (Throwable t) {
LOGGER.error(t, "Exception caught while running the task.");
taskStatus = TaskStatus.failure(
task.getId(),
"Failed with an exception. See indexer logs for more details."
);
}
finally {
taskWorkItem.setState(RunnerTaskState.NONE);
if (taskStatus == null) {
taskStatus = TaskStatus.failure(task.getId());
}
Thread.currentThread().setName(priorThreadName);
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);

View File

@ -251,7 +251,15 @@ public class WorkerTaskManager
@Override
public void onFailure(Throwable t)
{
submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
submitNoticeToExec(
new StatusNotice(
task,
TaskStatus.failure(
task.getId(),
"Failed to run task with an exception. See middleManager or indexer logs for more details."
)
)
);
}
}
);

View File

@ -127,7 +127,10 @@ public class WorkerTaskMonitor extends WorkerTaskManager
if (completedAnnouncement != null) {
completionStatus = completedAnnouncement.getTaskStatus();
} else if (!runningTasks.containsKey(announcement.getTaskStatus().getId())) {
completionStatus = TaskStatus.failure(announcement.getTaskStatus().getId());
completionStatus = TaskStatus.failure(
announcement.getTaskStatus().getId(),
"Canceled as unknown task. See middleManager or indexer logs for more details."
);
}
if (completionStatus != null) {

View File

@ -19,18 +19,35 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.assertj.core.util.Lists;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ForkingTaskRunnerTest
{
@ -145,12 +162,192 @@ public class ForkingTaskRunnerTest
"-Dsome.somepassword = secret=value",
"-Dsome.some=notasecret",
"-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
),
"java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
"-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
),
"java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+ "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
);
StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
null,
new WorkerConfig(),
null,
null,
null,
null,
startupLoggingConfig
);
Assert.assertEquals(
originalAndExpectedCommand.rhs,
forkingTaskRunner.getMaskedCommand(
startupLoggingConfig.getMaskProperties(),
originalAndExpectedCommand.lhs
)
);
}
@Test
public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
{
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false
),
new WorkerConfig(),
new Properties(),
new NoopTaskLogs(),
new DefaultObjectMapper(),
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig()
)
{
@Override
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
{
ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
Mockito.doNothing().when(processHolder).shutdown();
return processHolder;
}
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
// Emulate task process failure
return 1;
}
};
final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(
"Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
status.getErrorMsg()
);
}
@Test
public void testTaskStatusWhenTaskProcessSucceedsTaskSucceeds() throws ExecutionException, InterruptedException
{
ObjectMapper mapper = new DefaultObjectMapper();
Task task = NoopTask.create();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false
),
new WorkerConfig(),
new Properties(),
new NoopTaskLogs(),
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig()
)
{
@Override
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
{
ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
Mockito.doNothing().when(processHolder).shutdown();
for (String param : command) {
if (param.endsWith("status.json")) {
mapper.writeValue(new File(param), TaskStatus.success(task.getId()));
break;
}
}
return processHolder;
}
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
return 0;
}
};
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
}
@Test
public void testTaskStatusWhenTaskProcessSucceedsTaskFails() throws ExecutionException, InterruptedException
{
ObjectMapper mapper = new DefaultObjectMapper();
Task task = NoopTask.create();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false
),
new WorkerConfig(),
new Properties(),
new NoopTaskLogs(),
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig()
)
{
@Override
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
{
ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
Mockito.doNothing().when(processHolder).shutdown();
for (String param : command) {
if (param.endsWith("status.json")) {
mapper.writeValue(new File(param), TaskStatus.failure(task.getId(), "task failure test"));
break;
}
}
return processHolder;
}
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
return 0;
}
};
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("task failure test", status.getErrorMsg());
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@ -35,6 +36,8 @@ import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunner;
@ -64,9 +67,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class SingleTaskBackgroundRunnerTest
{
@ -94,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
null,
@ -199,6 +205,111 @@ public class SingleTaskBackgroundRunnerTest
Assert.assertTrue(holder.get());
}
@Test
public void testStopRestorableTaskExceptionAfterStop()
{
// statusChanged callback can be called by multiple threads.
AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
runner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "testStopRestorableTaskExceptionAfterStop";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
statusHolder.set(status);
}
},
Execs.directExecutor()
);
runner.run(
new RestorableTask(new BooleanHolder())
{
@Override
public TaskStatus run(TaskToolbox toolbox)
{
throw new Error("task failure test");
}
}
);
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
Assert.assertEquals(
"Failed to stop gracefully with exception. See task logs for more details.",
statusHolder.get().getErrorMsg()
);
}
@Test
public void testStopNonRestorableTask() throws InterruptedException
{
// latch to wait for SingleTaskBackgroundRunnerCallable to be executed before stopping the task
// We need this latch because TaskRunnerListener is currently racy.
// See https://github.com/apache/druid/issues/11445 for more details.
CountDownLatch runLatch = new CountDownLatch(1);
// statusChanged callback can be called by multiple threads.
AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
runner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "testStopNonRestorableTask";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
if (status.getStatusCode() == TaskState.RUNNING) {
runLatch.countDown();
} else {
statusHolder.set(status);
}
}
},
Execs.directExecutor()
);
runner.run(
new NoopTask(
null,
null,
"datasource",
10000, // 10 sec
0,
null,
null,
null
)
);
Assert.assertTrue(runLatch.await(1, TimeUnit.SECONDS));
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
Assert.assertEquals(
"Canceled as task execution process stopped",
statusHolder.get().getErrorMsg()
);
}
private static class RestorableTask extends AbstractTask
{
private final BooleanHolder gracefullyStopped;

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ThreadingTaskRunnerTest
{
@Test
public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException
{
ThreadingTaskRunner runner = new ThreadingTaskRunner(
mockTaskToolboxFactory(),
new TaskConfig(
null,
null,
null,
null,
ImmutableList.of(),
false,
new Period("PT0S"),
new Period("PT10S"),
ImmutableList.of(),
false,
false
),
new WorkerConfig(),
new NoopTaskLogs(),
new DefaultObjectMapper(),
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
new DruidNode("middleManager", "host", false, 8091, null, true, false)
);
Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", "datasource", null)
{
@Override
public String getType()
{
return "test";
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
throw new RuntimeException("Task failure test");
}
});
TaskStatus status = statusFuture.get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(
"Failed with an exception. See indexer logs for more details.",
status.getErrorMsg()
);
}
private static TaskToolboxFactory mockTaskToolboxFactory()
{
TaskToolboxFactory factory = Mockito.mock(TaskToolboxFactory.class);
Mockito.when(factory.build(ArgumentMatchers.any())).thenReturn(Mockito.mock(TaskToolbox.class));
return factory;
}
}

View File

@ -25,8 +25,10 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
@ -55,6 +57,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Map;
/**
*/
@ -263,6 +266,35 @@ public class WorkerTaskManagerTest
Assert.assertNotNull(update4.getTaskAnnouncement().getTaskLocation().getHost());
}
@Test(timeout = 30_000L)
public void testTaskStatusWhenTaskRunnerFutureThrowsException() throws Exception
{
Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
{
@Override
public TaskStatus run(TaskToolbox toolbox)
{
throw new Error("task failure test");
}
};
workerTaskManager.start();
workerTaskManager.assignTask(task);
Map<String, TaskAnnouncement> completeTasks;
do {
completeTasks = workerTaskManager.getCompletedTasks();
} while (completeTasks.isEmpty());
Assert.assertEquals(1, completeTasks.size());
TaskAnnouncement announcement = completeTasks.get(task.getId());
Assert.assertNotNull(announcement);
Assert.assertEquals(TaskState.FAILED, announcement.getStatus());
Assert.assertEquals(
"Failed to run task with an exception. See middleManager or indexer logs for more details.",
announcement.getTaskStatus().getErrorMsg()
);
}
private NoopTask createNoopTask(String id)
{
return new NoopTask(id, null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));

View File

@ -361,6 +361,10 @@ public class WorkerTaskMonitorTest
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskState.FAILED, announcements.get(0).getTaskStatus().getStatusCode());
Assert.assertEquals(
"Canceled as unknown task. See middleManager or indexer logs for more details.",
announcements.get(0).getTaskStatus().getErrorMsg()
);
}
@Test(timeout = 60_000L)