Use thread priorities. (aka set `nice` values for background-like tasks)

* Defaults the thread priority to java.util.Thread.NORM_PRIORITY in io.druid.indexing.common.task.AbstractTask
 * Each exec service has its own Task Factory which is assigned a priority for spawned task. Therefore each priority class has a unique exec service
 * Added priority to tasks as taskPriority in the task context. <0 means low, 0 means take default, >0 means high. It is up to any particular implementation to determine how to handle these numbers
 * Add options to ForkingTaskRunner
    * Add "-XX:+UseThreadPriorities" default option
    * Add "-XX:ThreadPriorityPolicy=42" default option
 * AbstractTask - Removed unneded @JsonIgnore on priority
 * Added priority to RealtimePlumber executors. All sub-executors (non query runners) get Thread.MIN_PRIORITY
 * Add persistThreadPriority and mergeThreadPriority to realtime tuning config
This commit is contained in:
Charles Allen 2014-11-03 20:53:55 -08:00 committed by Charles Allen
parent 695f107870
commit 2e1d6aaf3d
16 changed files with 237 additions and 29 deletions

View File

@ -21,6 +21,8 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@ -37,24 +39,51 @@ import java.util.concurrent.TimeUnit;
*/
public class Execs
{
public static ExecutorService singleThreaded(String nameFormat)
public static ExecutorService singleThreaded(@NotNull String nameFormat)
{
return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat));
return singleThreaded(nameFormat, null);
}
public static ExecutorService multiThreaded(int threads, String nameFormat)
public static ExecutorService singleThreaded(@NotNull String nameFormat, @Nullable Integer priority)
{
return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat));
return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat, priority));
}
public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat)
public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat)
{
return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat));
return multiThreaded(threads, nameFormat, null);
}
public static ThreadFactory makeThreadFactory(String nameFormat)
public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat, @Nullable Integer priority)
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat, priority));
}
public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat)
{
return scheduledSingleThreaded(nameFormat, null);
}
public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat, @Nullable Integer priority)
{
return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat, priority));
}
public static ThreadFactory makeThreadFactory(@NotNull String nameFormat)
{
return makeThreadFactory(nameFormat, null);
}
public static ThreadFactory makeThreadFactory(@NotNull String nameFormat, @Nullable Integer priority)
{
final ThreadFactoryBuilder builder = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(nameFormat);
if (priority != null) {
builder.setPriority(priority);
}
return builder.build();
}
/**
@ -64,6 +93,15 @@ public class Execs
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
return newBlockingSingleThreaded(nameFormat, capacity, null);
}
public static ExecutorService newBlockingSingleThreaded(
final String nameFormat,
final int capacity,
final Integer priority
)
{
final BlockingQueue<Runnable> queue;
if (capacity > 0) {
@ -72,7 +110,7 @@ public class Execs
queue = new SynchronousQueue<>();
}
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat, priority),
new RejectedExecutionHandler()
{
@Override

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.concurrent;
public class TaskThreadPriority
{
// The task context key to grab the task priority from
public static final String CONTEXT_KEY = "backgroundThreadPriority";
// NOTE: Setting negative nice values on linux systems (threadPriority > Thread.NORM_PRIORITY) requires running
// as *ROOT*. This is, in general, not advisable.
// In order to have these priorities honored on linux systems, the JVM must be launched with the following options:
//
// -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42
//
// +UseThreadPriorities usually only enables setting thread priorities if run as root... but
// ThreadPriorityPolicy is "supposed" to be either 0 or 1, but there is a "bug"/feature in
// the common JVMs. Whereby if UseThreadPriorities is set, and the ThreadPriorityPolicy!=1
// the check for "root" is skipped. This works fine as long as you are LOWERING the
// threadPriority of tasks (which we are). If you modify the code to allow higher priorities
// things will crash and burn at runtime. It is advisable to set it to 42 so that relevant searches can be found on
// the flag
//
// Not setting these JVM options disables thread priorities on linux systems
//
// See : http://www.akshaal.info/2008/04/javas-thread-priorities-in-linux.html for an explanation
// See : http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/b0c7e7f1bbbe/src/os/linux/vm/os_linux.cpp#l3933 for
// the bug in action
// See: https://docs.oracle.com/cd/E15289_01/doc.40/e15062/optionxx.htm#BABGBFHF for the options documentation
/**
* Return the thread-factory friendly priorities from the task priority
*
* @param taskPriority The priority for the task. >0 means high. 0 means inherit from current thread, <0 means low.
*
* @return The thread priority to use in a thread factory, or null if no priority is to be set
*/
public static Integer getThreadPriorityFromTaskPriority(final int taskPriority)
{
if (taskPriority == 0) {
return null;
}
int finalPriority = taskPriority + Thread.NORM_PRIORITY;
if (taskPriority > Thread.MAX_PRIORITY) {
return Thread.MAX_PRIORITY;
}
if (finalPriority < Thread.MIN_PRIORITY) {
return Thread.MIN_PRIORITY;
}
return finalPriority;
}
}

View File

@ -143,6 +143,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec'|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.
#### Rejection Policy

View File

@ -157,6 +157,7 @@ public abstract class AbstractTask implements Task
* Start helper methods
*
* @param objects objects to join
*
* @return string of joined objects
*/
public static String joinId(Object... objects)

View File

@ -141,7 +141,9 @@ public class IndexTask extends AbstractFixedIntervalTask
null,
shardSpec,
indexSpec,
buildV9Directly
buildV9Directly,
0,
0
);
}

View File

@ -322,6 +322,14 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
/**
* These are not enabled per default to allow the user to either set or not set them
* Users are highly suggested to be set in druid.indexer.runner.javaOpts
* See io.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
* for more information
command.add("-XX:+UseThreadPriorities");
command.add("-XX:ThreadPriorityPolicy=42");
*/
if (config.isSeparateIngestionEndpoint()) {
command.add(String.format(

View File

@ -36,6 +36,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
@ -52,8 +53,11 @@ import org.joda.time.Interval;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
@ -66,7 +70,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
private final ListeningExecutorService exec;
private final ConcurrentMap<Integer, ListeningExecutorService> exec = new ConcurrentHashMap<>();
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private final ServiceEmitter emitter;
@ -79,7 +83,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig;
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
}
@ -89,10 +92,27 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return ImmutableList.of();
}
private static ListeningExecutorService buildExecutorService(int priority)
{
return MoreExecutors.listeningDecorator(
Execs.singleThreaded(
"task-runner-%d-priority-" + priority,
TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
)
);
}
@LifecycleStop
public void stop()
{
exec.shutdown();
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdown();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}
for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
final Task task = item.getTask();
@ -145,14 +165,44 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
// Ok, now interrupt everything.
exec.shutdownNow();
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdownNow();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
int taskPriority = 0;
if(taskPriorityObj != null){
if(taskPriorityObj instanceof Number) {
taskPriority = ((Number) taskPriorityObj).intValue();
} else if(taskPriorityObj instanceof String) {
try {
taskPriority = Integer.parseInt(taskPriorityObj.toString());
}
catch (NumberFormatException e) {
log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
}
}
// Ensure an executor for that priority exists
if (!exec.containsKey(taskPriority)) {
final ListeningExecutorService executorService = buildExecutorService(taskPriority);
if (exec.putIfAbsent(taskPriority, executorService) != null) {
// favor prior service
executorService.shutdownNow();
}
}
final ListenableFuture<TaskStatus> statusFuture = exec.get(taskPriority)
.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(

View File

@ -592,7 +592,8 @@ public class RealtimeIndexTaskTest
null,
null,
null,
buildV9Directly
buildV9Directly,
0, 0
);
return new RealtimeIndexTask(
taskId,

View File

@ -333,7 +333,9 @@ public class TaskSerdeTest
1,
new NoneShardSpec(),
indexSpec,
null
null,
0,
0
)
),
null

View File

@ -1091,7 +1091,8 @@ public class TaskLifecycleTest
null,
null,
null,
null
null,
0, 0
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
return new RealtimeIndexTask(

View File

@ -61,7 +61,9 @@ public class RealtimeTuningConfig implements TuningConfig
defaultMaxPendingPersists,
defaultShardSpec,
defaultIndexSpec,
defaultBuildV9Directly
defaultBuildV9Directly,
0,
0
);
}
@ -75,6 +77,8 @@ public class RealtimeTuningConfig implements TuningConfig
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
private final int persistThreadPriority;
private final int mergeThreadPriority;
@JsonCreator
public RealtimeTuningConfig(
@ -87,7 +91,9 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -104,6 +110,8 @@ public class RealtimeTuningConfig implements TuningConfig
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
this.mergeThreadPriority = mergeThreadPriority;
this.persistThreadPriority = persistThreadPriority;
}
@JsonProperty
@ -161,10 +169,22 @@ public class RealtimeTuningConfig implements TuningConfig
}
@JsonProperty
public Boolean getBuildV9Directly() {
public Boolean getBuildV9Directly()
{
return buildV9Directly;
}
public int getPersistThreadPriority()
{
return this.persistThreadPriority;
}
@JsonProperty
public int getMergeThreadPriority()
{
return this.mergeThreadPriority;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -177,7 +197,9 @@ public class RealtimeTuningConfig implements TuningConfig
maxPendingPersists,
shardSpec,
indexSpec,
buildV9Directly
buildV9Directly,
persistThreadPriority,
mergeThreadPriority
);
}
@ -193,7 +215,9 @@ public class RealtimeTuningConfig implements TuningConfig
maxPendingPersists,
shardSpec,
indexSpec,
buildV9Directly
buildV9Directly,
persistThreadPriority,
mergeThreadPriority
);
}
}

View File

@ -48,6 +48,7 @@ import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
@ -663,13 +664,17 @@ public class RealtimePlumber implements Plumber
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
"plumber_persist_%d",
maxPendingPersists,
TaskThreadPriority.getThreadPriorityFromTaskPriority(config.getPersistThreadPriority())
);
}
if (mergeExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = Execs.newBlockingSingleThreaded(
"plumber_merge_%d", 1
"plumber_merge_%d",
1,
TaskThreadPriority.getThreadPriorityFromTaskPriority(config.getMergeThreadPriority())
);
}

View File

@ -117,7 +117,7 @@ public class FireDepartmentTest
null
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, null
null, null, null, null, null, null, null, null, null, null, 0, 0
)
);

View File

@ -151,7 +151,9 @@ public class RealtimeManagerTest
null,
null,
null,
null
null,
0,
0
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -194,7 +194,8 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
buildV9Directly
buildV9Directly,
0, 0
);
realtimePlumberSchool = new RealtimePlumberSchool(

View File

@ -67,7 +67,8 @@ public class SinkTest
null,
null,
null,
null
null,
0, 0
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);