diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index ab7de6cf3c8..f07a859e777 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -37,6 +37,7 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.FileWriteMode; import com.google.common.io.Files; +import com.google.common.math.IntMath; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -58,6 +59,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.DruidMetrics; @@ -67,6 +69,7 @@ import org.apache.druid.server.metrics.MonitorsConfig; import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogStreamer; +import org.apache.druid.utils.JvmUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -75,6 +78,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -105,6 +109,7 @@ public class ForkingTaskRunner private final StartupLoggingConfig startupLoggingConfig; private final WorkerConfig workerConfig; + private volatile int numProcessorsPerTask = -1; private volatile boolean stopping = false; private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong(); @@ -214,6 +219,13 @@ public class ForkingTaskRunner command.add("-cp"); command.add(taskClasspath); + if (numProcessorsPerTask < 1) { + // numProcessorsPerTask is set by start() + throw new ISE("Not started"); + } + + command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask)); + Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); Iterables.addAll(command, config.getJavaOptsArray()); @@ -633,9 +645,10 @@ public class ForkingTaskRunner } @Override + @LifecycleStart public void start() { - // No state setup required + setNumProcessorsPerTask(); } @Override @@ -788,6 +801,20 @@ public class ForkingTaskRunner return successfulTaskCount - lastReportedSuccessfulTaskCount; } + @VisibleForTesting + void setNumProcessorsPerTask() + { + // Divide number of available processors by the number of tasks. + // This prevents various automatically-sized thread pools from being unreasonably large (we don't want each + // task to size its pools as if it is the only thing on the entire machine). + + final int availableProcessors = JvmUtils.getRuntimeInfo().getAvailableProcessors(); + numProcessorsPerTask = Math.max( + 1, + IntMath.divide(availableProcessors, workerConfig.getCapacity(), RoundingMode.CEILING) + ); + } + protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 9e45d92ce5b..2d1a791586a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -242,6 +242,7 @@ public class ForkingTaskRunnerTest } }; + forkingTaskRunner.setNumProcessorsPerTask(); final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get(); Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); Assert.assertEquals( @@ -312,6 +313,7 @@ public class ForkingTaskRunnerTest } }; + forkingTaskRunner.setNumProcessorsPerTask(); final TaskStatus status = forkingTaskRunner.run(task).get(); Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); Assert.assertNull(status.getErrorMsg()); @@ -373,6 +375,7 @@ public class ForkingTaskRunnerTest } }; + forkingTaskRunner.setNumProcessorsPerTask(); final TaskStatus status = forkingTaskRunner.run(task).get(); Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); Assert.assertEquals("task failure test", status.getErrorMsg()); @@ -441,6 +444,7 @@ public class ForkingTaskRunnerTest } }; + forkingTaskRunner.setNumProcessorsPerTask(); forkingTaskRunner.run(task).get(); Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get()); Assert.assertTrue(xmxJavaOptsIndex.get() >= 0); @@ -509,6 +513,7 @@ public class ForkingTaskRunnerTest } }; + forkingTaskRunner.setNumProcessorsPerTask(); ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get()); Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY + " in context of task: " + task.getId() + " must be an array of strings.") diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index ed424ba037f..f423982b9f7 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -137,7 +137,7 @@ public class CliMiddleManager extends ServerRunnable JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class); binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); - binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); + binder.bind(ForkingTaskRunner.class).in(ManageLifecycle.class); binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class); binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);