ForkingTaskRunner: Set ActiveProcessorCount for tasks. (#12592)

* ForkingTaskRunner: Set ActiveProcessorCount for tasks.

This prevents various automatically-sized thread pools from being unreasonably
large (we don't want each task to size its pools as if it is the only thing on
the entire machine).

* Fix tests.

* Add missing LifecycleStart annotation.

* ForkingTaskRunner needs ManageLifecycle.
This commit is contained in:
Gian Merlino 2022-06-15 15:56:32 -07:00 committed by GitHub
parent 45e3111549
commit 70f3b13621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 2 deletions

View File

@ -37,6 +37,7 @@ import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -58,6 +59,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.DruidMetrics;
@ -67,6 +69,7 @@ import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -75,6 +78,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@ -105,6 +109,7 @@ public class ForkingTaskRunner
private final StartupLoggingConfig startupLoggingConfig;
private final WorkerConfig workerConfig;
private volatile int numProcessorsPerTask = -1;
private volatile boolean stopping = false;
private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
@ -214,6 +219,13 @@ public class ForkingTaskRunner
command.add("-cp");
command.add(taskClasspath);
if (numProcessorsPerTask < 1) {
// numProcessorsPerTask is set by start()
throw new ISE("Not started");
}
command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask));
Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
Iterables.addAll(command, config.getJavaOptsArray());
@ -633,9 +645,10 @@ public class ForkingTaskRunner
}
@Override
@LifecycleStart
public void start()
{
// No state setup required
setNumProcessorsPerTask();
}
@Override
@ -788,6 +801,20 @@ public class ForkingTaskRunner
return successfulTaskCount - lastReportedSuccessfulTaskCount;
}
@VisibleForTesting
void setNumProcessorsPerTask()
{
// Divide number of available processors by the number of tasks.
// This prevents various automatically-sized thread pools from being unreasonably large (we don't want each
// task to size its pools as if it is the only thing on the entire machine).
final int availableProcessors = JvmUtils.getRuntimeInfo().getAvailableProcessors();
numProcessorsPerTask = Math.max(
1,
IntMath.divide(availableProcessors, workerConfig.getCapacity(), RoundingMode.CEILING)
);
}
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;

View File

@ -242,6 +242,7 @@ public class ForkingTaskRunnerTest
}
};
forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(
@ -312,6 +313,7 @@ public class ForkingTaskRunnerTest
}
};
forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
@ -373,6 +375,7 @@ public class ForkingTaskRunnerTest
}
};
forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("task failure test", status.getErrorMsg());
@ -441,6 +444,7 @@ public class ForkingTaskRunnerTest
}
};
forkingTaskRunner.setNumProcessorsPerTask();
forkingTaskRunner.run(task).get();
Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get());
Assert.assertTrue(xmxJavaOptsIndex.get() >= 0);
@ -509,6 +513,7 @@ public class ForkingTaskRunnerTest
}
};
forkingTaskRunner.setNumProcessorsPerTask();
ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get());
Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+ " in context of task: " + task.getId() + " must be an array of strings.")

View File

@ -137,7 +137,7 @@ public class CliMiddleManager extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(ForkingTaskRunner.class).in(ManageLifecycle.class);
binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);