diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 2d083386860..49cae238f28 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -343,14 +344,11 @@ public class ForkingTaskRunner jsonMapper.writeValue(taskFile, task); } - LOGGER.info("Running command: %s", getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)); - taskWorkItem.processHolder = new ProcessHolder( - new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), - logFile, - taskLocation.getHost(), - taskLocation.getPort(), - taskLocation.getTlsPort() + LOGGER.info( + "Running command: %s", + getMaskedCommand(startupLoggingConfig.getMaskProperties(), command) ); + taskWorkItem.processHolder = runTaskProcess(command, logFile, taskLocation); processHolder = taskWorkItem.processHolder; processHolder.registerWithCloser(closer); @@ -364,38 +362,23 @@ public class ForkingTaskRunner ); LOGGER.info("Logging task %s output to: %s", task.getId(), logFile); - boolean runFailed = true; - - final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); - - // This will block for a while. So we append the thread information with more details - final String priorThreadName = Thread.currentThread().getName(); - Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); - - try (final OutputStream toLogfile = logSink.openStream()) { - ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); - final int statusCode = processHolder.process.waitFor(); - LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); - if (statusCode == 0) { - runFailed = false; - } - } - finally { - Thread.currentThread().setName(priorThreadName); - // Upload task logs - taskLogPusher.pushTaskLog(task.getId(), logFile); - if (reportsFile.exists()) { - taskLogPusher.pushTaskReports(task.getId(), reportsFile); - } - } - - TaskStatus status; - if (!runFailed) { + final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile); + final TaskStatus status; + if (exitCode == 0) { + LOGGER.info("Process exited successfully for task: %s", task.getId()); // Process exited successfully status = jsonMapper.readValue(statusFile, TaskStatus.class); } else { + LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId()); // Process exited unsuccessfully - status = TaskStatus.failure(task.getId()); + status = TaskStatus.failure( + task.getId(), + StringUtils.format( + "Task execution process exited unsuccessfully with code[%s]. " + + "See middleManager logs for more details.", + exitCode + ) + ); } TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); @@ -417,7 +400,7 @@ public class ForkingTaskRunner synchronized (tasks) { final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); if (taskWorkItem != null && taskWorkItem.processHolder != null) { - taskWorkItem.processHolder.process.destroy(); + taskWorkItem.processHolder.shutdown(); } if (!stopping) { saveRunningTasks(); @@ -458,6 +441,42 @@ public class ForkingTaskRunner } } + @VisibleForTesting + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException + { + return new ProcessHolder( + new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), + logFile, + taskLocation.getHost(), + taskLocation.getPort(), + taskLocation.getTlsPort() + ); + } + + @VisibleForTesting + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + throws IOException, InterruptedException + { + final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); + + // This will block for a while. So we append the thread information with more details + final String priorThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); + + try (final OutputStream toLogfile = logSink.openStream()) { + ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); + return processHolder.process.waitFor(); + } + finally { + Thread.currentThread().setName(priorThreadName); + // Upload task logs + taskLogPusher.pushTaskLog(task.getId(), logFile); + if (reportsFile.exists()) { + taskLogPusher.pushTaskReports(task.getId(), reportsFile); + } + } + } + @Override @LifecycleStop public void stop() @@ -726,7 +745,8 @@ public class ForkingTaskRunner } } - private static class ProcessHolder + @VisibleForTesting + static class ProcessHolder { private final Process process; private final File logFile; @@ -743,11 +763,18 @@ public class ForkingTaskRunner this.tlsPort = tlsPort; } - private void registerWithCloser(Closer closer) + @VisibleForTesting + void registerWithCloser(Closer closer) { closer.register(process.getInputStream()); closer.register(process.getOutputStream()); } + + @VisibleForTesting + void shutdown() + { + process.destroy(); + } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index cfbbab4bf3e..24dba4f6520 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -208,10 +208,31 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke .emit(); log.warn(e, "Graceful shutdown of task[%s] aborted with exception.", task.getId()); error = true; - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId())); + // Creating a new status to only feed listeners seems quite strange. + // This is currently OK because we have no listeners yet registered in peon. + // However, we should fix this in the near future by always retrieving task status + // from one single source of truth that is also propagated to the overlord. + // See https://github.com/apache/druid/issues/11445. + TaskRunnerUtils.notifyStatusChanged( + listeners, + task.getId(), + TaskStatus.failure( + task.getId(), + "Failed to stop gracefully with exception. See task logs for more details." + ) + ); } } else { - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId())); + // Creating a new status to only feed listeners seems quite strange. + // This is currently OK because we have no listeners yet registered in peon. + // However, we should fix this in the near future by always retrieving task status + // from one single source of truth that is also propagated to the overlord. + // See https://github.com/apache/druid/issues/11445. + TaskRunnerUtils.notifyStatusChanged( + listeners, + task.getId(), + TaskStatus.failure(task.getId(), "Canceled as task execution process stopped") + ); } elapsed = System.currentTimeMillis() - start; @@ -417,7 +438,6 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke { return task.getDataSource(); } - } private class SingleTaskBackgroundRunnerCallable implements Callable diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java index e463d835398..84a414ce6a3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java @@ -197,7 +197,7 @@ public class ThreadingTaskRunner Thread.currentThread() .setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName)); - TaskStatus taskStatus = null; + TaskStatus taskStatus; final TaskToolbox toolbox = toolboxFactory.build(task); TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); TaskRunnerUtils.notifyStatusChanged( @@ -212,12 +212,13 @@ public class ThreadingTaskRunner } catch (Throwable t) { LOGGER.error(t, "Exception caught while running the task."); + taskStatus = TaskStatus.failure( + task.getId(), + "Failed with an exception. See indexer logs for more details." + ); } finally { taskWorkItem.setState(RunnerTaskState.NONE); - if (taskStatus == null) { - taskStatus = TaskStatus.failure(task.getId()); - } Thread.currentThread().setName(priorThreadName); if (reportsFile.exists()) { taskLogPusher.pushTaskReports(task.getId(), reportsFile); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 19598d05dc0..d0f2a3b0715 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -251,7 +251,15 @@ public class WorkerTaskManager @Override public void onFailure(Throwable t) { - submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); + submitNoticeToExec( + new StatusNotice( + task, + TaskStatus.failure( + task.getId(), + "Failed to run task with an exception. See middleManager or indexer logs for more details." + ) + ) + ); } } ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java index 23b19683acb..27fb31e8d45 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java @@ -127,7 +127,10 @@ public class WorkerTaskMonitor extends WorkerTaskManager if (completedAnnouncement != null) { completionStatus = completedAnnouncement.getTaskStatus(); } else if (!runningTasks.containsKey(announcement.getTaskStatus().getId())) { - completionStatus = TaskStatus.failure(announcement.getTaskStatus().getId()); + completionStatus = TaskStatus.failure( + announcement.getTaskStatus().getId(), + "Canceled as unknown task. See middleManager or indexer logs for more details." + ); } if (completionStatus != null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index b766bc7239d..06629b10862 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -19,18 +19,35 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; +import org.apache.druid.tasklogs.NoopTaskLogs; import org.assertj.core.util.Lists; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import java.io.File; +import java.io.IOException; import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; public class ForkingTaskRunnerTest { @@ -145,12 +162,192 @@ public class ForkingTaskRunnerTest "-Dsome.somepassword = secret=value", "-Dsome.some=notasecret", "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa=" - ), - "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa= -Dsome.property=random -Dsome.otherproperty = random=random " + - "-Dsome.somesecret = -Dsome.somesecret= -Dsome.somepassword = -Dsome.some=notasecret -Dsome.otherSecret=" + ), + "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa= -Dsome.property=random -Dsome.otherproperty = random=random " + + "-Dsome.somesecret = -Dsome.somesecret= -Dsome.somepassword = -Dsome.some=notasecret -Dsome.otherSecret=" ); StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig(); - ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig); - Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs)); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + null, + new WorkerConfig(), + null, + null, + null, + null, + startupLoggingConfig + ); + Assert.assertEquals( + originalAndExpectedCommand.rhs, + forkingTaskRunner.getMaskedCommand( + startupLoggingConfig.getMaskProperties(), + originalAndExpectedCommand.lhs + ) + ); + } + + @Test + public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException + { + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false + ), + new WorkerConfig(), + new Properties(), + new NoopTaskLogs(), + new DefaultObjectMapper(), + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig() + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) + { + ProcessHolder processHolder = Mockito.mock(ProcessHolder.class); + Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any()); + Mockito.doNothing().when(processHolder).shutdown(); + return processHolder; + } + + @Override + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + { + // Emulate task process failure + return 1; + } + }; + + final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get(); + Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals( + "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.", + status.getErrorMsg() + ); + } + + @Test + public void testTaskStatusWhenTaskProcessSucceedsTaskSucceeds() throws ExecutionException, InterruptedException + { + ObjectMapper mapper = new DefaultObjectMapper(); + Task task = NoopTask.create(); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false + ), + new WorkerConfig(), + new Properties(), + new NoopTaskLogs(), + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig() + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException + { + ProcessHolder processHolder = Mockito.mock(ProcessHolder.class); + Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any()); + Mockito.doNothing().when(processHolder).shutdown(); + + for (String param : command) { + if (param.endsWith("status.json")) { + mapper.writeValue(new File(param), TaskStatus.success(task.getId())); + break; + } + } + + return processHolder; + } + + @Override + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + { + return 0; + } + }; + + final TaskStatus status = forkingTaskRunner.run(task).get(); + Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); + Assert.assertNull(status.getErrorMsg()); + } + + @Test + public void testTaskStatusWhenTaskProcessSucceedsTaskFails() throws ExecutionException, InterruptedException + { + ObjectMapper mapper = new DefaultObjectMapper(); + Task task = NoopTask.create(); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false + ), + new WorkerConfig(), + new Properties(), + new NoopTaskLogs(), + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig() + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException + { + ProcessHolder processHolder = Mockito.mock(ProcessHolder.class); + Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any()); + Mockito.doNothing().when(processHolder).shutdown(); + + for (String param : command) { + if (param.endsWith("status.json")) { + mapper.writeValue(new File(param), TaskStatus.failure(task.getId(), "task failure test")); + break; + } + } + + return processHolder; + } + + @Override + int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile) + { + return 0; + } + }; + + final TaskStatus status = forkingTaskRunner.run(task).get(); + Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals("task failure test", status.getErrorMsg()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index f1ea2a0afad..7b12c0dc381 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; @@ -35,6 +36,8 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryRunner; @@ -64,9 +67,11 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; public class SingleTaskBackgroundRunnerTest { @@ -94,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest false ); final ServiceEmitter emitter = new NoopServiceEmitter(); + EmittingLogger.registerEmitter(emitter); final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( taskConfig, null, @@ -199,6 +205,111 @@ public class SingleTaskBackgroundRunnerTest Assert.assertTrue(holder.get()); } + @Test + public void testStopRestorableTaskExceptionAfterStop() + { + // statusChanged callback can be called by multiple threads. + AtomicReference statusHolder = new AtomicReference<>(); + runner.registerListener( + new TaskRunnerListener() + { + @Override + public String getListenerId() + { + return "testStopRestorableTaskExceptionAfterStop"; + } + + @Override + public void locationChanged(String taskId, TaskLocation newLocation) + { + // do nothing + } + + @Override + public void statusChanged(String taskId, TaskStatus status) + { + statusHolder.set(status); + } + }, + Execs.directExecutor() + ); + runner.run( + new RestorableTask(new BooleanHolder()) + { + @Override + public TaskStatus run(TaskToolbox toolbox) + { + throw new Error("task failure test"); + } + } + ); + runner.stop(); + Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode()); + Assert.assertEquals( + "Failed to stop gracefully with exception. See task logs for more details.", + statusHolder.get().getErrorMsg() + ); + } + + @Test + public void testStopNonRestorableTask() throws InterruptedException + { + // latch to wait for SingleTaskBackgroundRunnerCallable to be executed before stopping the task + // We need this latch because TaskRunnerListener is currently racy. + // See https://github.com/apache/druid/issues/11445 for more details. + CountDownLatch runLatch = new CountDownLatch(1); + // statusChanged callback can be called by multiple threads. + AtomicReference statusHolder = new AtomicReference<>(); + runner.registerListener( + new TaskRunnerListener() + { + @Override + public String getListenerId() + { + return "testStopNonRestorableTask"; + } + + @Override + public void locationChanged(String taskId, TaskLocation newLocation) + { + // do nothing + } + + @Override + public void statusChanged(String taskId, TaskStatus status) + { + if (status.getStatusCode() == TaskState.RUNNING) { + runLatch.countDown(); + } else { + statusHolder.set(status); + } + } + }, + Execs.directExecutor() + ); + runner.run( + new NoopTask( + null, + null, + "datasource", + 10000, // 10 sec + 0, + null, + null, + null + ) + ); + + Assert.assertTrue(runLatch.await(1, TimeUnit.SECONDS)); + runner.stop(); + + Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode()); + Assert.assertEquals( + "Canceled as task execution process stopped", + statusHolder.get().getErrorMsg() + ); + } + private static class RestorableTask extends AbstractTask { private final BooleanHolder gracefullyStopped; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java new file mode 100644 index 00000000000..d8ddd5145e5 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TaskToolboxFactory; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.server.DruidNode; +import org.apache.druid.tasklogs.NoopTaskLogs; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class ThreadingTaskRunnerTest +{ + + @Test + public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException + { + ThreadingTaskRunner runner = new ThreadingTaskRunner( + mockTaskToolboxFactory(), + new TaskConfig( + null, + null, + null, + null, + ImmutableList.of(), + false, + new Period("PT0S"), + new Period("PT10S"), + ImmutableList.of(), + false, + false + ), + new WorkerConfig(), + new NoopTaskLogs(), + new DefaultObjectMapper(), + new TestAppenderatorsManager(), + new MultipleFileTaskReportFileWriter(), + new DruidNode("middleManager", "host", false, 8091, null, true, false) + ); + + Future statusFuture = runner.run(new AbstractTask("id", "datasource", null) + { + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + return true; + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + } + + @Override + public TaskStatus run(TaskToolbox toolbox) + { + throw new RuntimeException("Task failure test"); + } + }); + + TaskStatus status = statusFuture.get(); + Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); + Assert.assertEquals( + "Failed with an exception. See indexer logs for more details.", + status.getErrorMsg() + ); + } + + private static TaskToolboxFactory mockTaskToolboxFactory() + { + TaskToolboxFactory factory = Mockito.mock(TaskToolboxFactory.class); + Mockito.when(factory.build(ArgumentMatchers.any())).thenReturn(Mockito.mock(TaskToolbox.class)); + return factory; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index b7a489f4903..389d3800da8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -25,8 +25,10 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.TestTasks; import org.apache.druid.indexing.common.TestUtils; @@ -55,6 +57,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.util.Map; /** */ @@ -263,6 +266,35 @@ public class WorkerTaskManagerTest Assert.assertNotNull(update4.getTaskAnnouncement().getTaskLocation().getHost()); } + @Test(timeout = 30_000L) + public void testTaskStatusWhenTaskRunnerFutureThrowsException() throws Exception + { + Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)) + { + @Override + public TaskStatus run(TaskToolbox toolbox) + { + throw new Error("task failure test"); + } + }; + workerTaskManager.start(); + workerTaskManager.assignTask(task); + + Map completeTasks; + do { + completeTasks = workerTaskManager.getCompletedTasks(); + } while (completeTasks.isEmpty()); + + Assert.assertEquals(1, completeTasks.size()); + TaskAnnouncement announcement = completeTasks.get(task.getId()); + Assert.assertNotNull(announcement); + Assert.assertEquals(TaskState.FAILED, announcement.getStatus()); + Assert.assertEquals( + "Failed to run task with an exception. See middleManager or indexer logs for more details.", + announcement.getTaskStatus().getErrorMsg() + ); + } + private NoopTask createNoopTask(String id) { return new NoopTask(id, null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index dbc44f06927..5a0270098fa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -361,6 +361,10 @@ public class WorkerTaskMonitorTest Assert.assertEquals(1, announcements.size()); Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId()); Assert.assertEquals(TaskState.FAILED, announcements.get(0).getTaskStatus().getStatusCode()); + Assert.assertEquals( + "Canceled as unknown task. See middleManager or indexer logs for more details.", + announcements.get(0).getTaskStatus().getErrorMsg() + ); } @Test(timeout = 60_000L)