From 2e1d6aaf3d79de5d8dbee29e3ede61d510fa1cd3 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 3 Nov 2014 20:53:55 -0800 Subject: [PATCH] Use thread priorities. (aka set `nice` values for background-like tasks) * Defaults the thread priority to java.util.Thread.NORM_PRIORITY in io.druid.indexing.common.task.AbstractTask * Each exec service has its own Task Factory which is assigned a priority for spawned task. Therefore each priority class has a unique exec service * Added priority to tasks as taskPriority in the task context. <0 means low, 0 means take default, >0 means high. It is up to any particular implementation to determine how to handle these numbers * Add options to ForkingTaskRunner * Add "-XX:+UseThreadPriorities" default option * Add "-XX:ThreadPriorityPolicy=42" default option * AbstractTask - Removed unneded @JsonIgnore on priority * Added priority to RealtimePlumber executors. All sub-executors (non query runners) get Thread.MIN_PRIORITY * Add persistThreadPriority and mergeThreadPriority to realtime tuning config --- .../main/java/io/druid/concurrent/Execs.java | 56 ++++++++++++--- .../druid/concurrent/TaskThreadPriority.java | 68 +++++++++++++++++++ docs/content/ingestion/realtime-ingestion.md | 4 ++ .../indexing/common/task/AbstractTask.java | 1 + .../druid/indexing/common/task/IndexTask.java | 4 +- .../indexing/overlord/ForkingTaskRunner.java | 8 +++ .../overlord/ThreadPoolTaskRunner.java | 60 ++++++++++++++-- .../common/task/RealtimeIndexTaskTest.java | 3 +- .../indexing/common/task/TaskSerdeTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 3 +- .../indexing/RealtimeTuningConfig.java | 34 ++++++++-- .../realtime/plumber/RealtimePlumber.java | 9 ++- .../segment/realtime/FireDepartmentTest.java | 2 +- .../segment/realtime/RealtimeManagerTest.java | 4 +- .../plumber/RealtimePlumberSchoolTest.java | 3 +- .../segment/realtime/plumber/SinkTest.java | 3 +- 16 files changed, 237 insertions(+), 29 deletions(-) create mode 100644 common/src/main/java/io/druid/concurrent/TaskThreadPriority.java diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index 757c4ccef5a..c6932b55d1e 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -21,6 +21,8 @@ package io.druid.concurrent; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -37,24 +39,51 @@ import java.util.concurrent.TimeUnit; */ public class Execs { - public static ExecutorService singleThreaded(String nameFormat) + + public static ExecutorService singleThreaded(@NotNull String nameFormat) { - return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat)); + return singleThreaded(nameFormat, null); } - public static ExecutorService multiThreaded(int threads, String nameFormat) + public static ExecutorService singleThreaded(@NotNull String nameFormat, @Nullable Integer priority) { - return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat)); + return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat, priority)); } - public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat) + public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat) { - return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat)); + return multiThreaded(threads, nameFormat, null); } - public static ThreadFactory makeThreadFactory(String nameFormat) + public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat, @Nullable Integer priority) { - return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build(); + return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat, priority)); + } + + public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat) + { + return scheduledSingleThreaded(nameFormat, null); + } + + public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat, @Nullable Integer priority) + { + return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat, priority)); + } + + public static ThreadFactory makeThreadFactory(@NotNull String nameFormat) + { + return makeThreadFactory(nameFormat, null); + } + + public static ThreadFactory makeThreadFactory(@NotNull String nameFormat, @Nullable Integer priority) + { + final ThreadFactoryBuilder builder = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(nameFormat); + if (priority != null) { + builder.setPriority(priority); + } + return builder.build(); } /** @@ -64,6 +93,15 @@ public class Execs * @return ExecutorService which blocks accepting new tasks when the capacity reached */ public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity) + { + return newBlockingSingleThreaded(nameFormat, capacity, null); + } + + public static ExecutorService newBlockingSingleThreaded( + final String nameFormat, + final int capacity, + final Integer priority + ) { final BlockingQueue queue; if (capacity > 0) { @@ -72,7 +110,7 @@ public class Execs queue = new SynchronousQueue<>(); } return new ThreadPoolExecutor( - 1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat), + 1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat, priority), new RejectedExecutionHandler() { @Override diff --git a/common/src/main/java/io/druid/concurrent/TaskThreadPriority.java b/common/src/main/java/io/druid/concurrent/TaskThreadPriority.java new file mode 100644 index 00000000000..381941e7c80 --- /dev/null +++ b/common/src/main/java/io/druid/concurrent/TaskThreadPriority.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.concurrent; + +public class TaskThreadPriority +{ + // The task context key to grab the task priority from + public static final String CONTEXT_KEY = "backgroundThreadPriority"; + // NOTE: Setting negative nice values on linux systems (threadPriority > Thread.NORM_PRIORITY) requires running + // as *ROOT*. This is, in general, not advisable. + // In order to have these priorities honored on linux systems, the JVM must be launched with the following options: + // + // -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42 + // + // +UseThreadPriorities usually only enables setting thread priorities if run as root... but + // ThreadPriorityPolicy is "supposed" to be either 0 or 1, but there is a "bug"/feature in + // the common JVMs. Whereby if UseThreadPriorities is set, and the ThreadPriorityPolicy!=1 + // the check for "root" is skipped. This works fine as long as you are LOWERING the + // threadPriority of tasks (which we are). If you modify the code to allow higher priorities + // things will crash and burn at runtime. It is advisable to set it to 42 so that relevant searches can be found on + // the flag + // + // Not setting these JVM options disables thread priorities on linux systems + // + // See : http://www.akshaal.info/2008/04/javas-thread-priorities-in-linux.html for an explanation + // See : http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/b0c7e7f1bbbe/src/os/linux/vm/os_linux.cpp#l3933 for + // the bug in action + // See: https://docs.oracle.com/cd/E15289_01/doc.40/e15062/optionxx.htm#BABGBFHF for the options documentation + + /** + * Return the thread-factory friendly priorities from the task priority + * + * @param taskPriority The priority for the task. >0 means high. 0 means inherit from current thread, <0 means low. + * + * @return The thread priority to use in a thread factory, or null if no priority is to be set + */ + public static Integer getThreadPriorityFromTaskPriority(final int taskPriority) + { + if (taskPriority == 0) { + return null; + } + int finalPriority = taskPriority + Thread.NORM_PRIORITY; + if (taskPriority > Thread.MAX_PRIORITY) { + return Thread.MAX_PRIORITY; + } + if (finalPriority < Thread.MIN_PRIORITY) { + return Thread.MIN_PRIORITY; + } + return finalPriority; + } +} diff --git a/docs/content/ingestion/realtime-ingestion.md b/docs/content/ingestion/realtime-ingestion.md index 5c5fe5b93d1..bd9fd908faa 100644 --- a/docs/content/ingestion/realtime-ingestion.md +++ b/docs/content/ingestion/realtime-ingestion.md @@ -143,6 +143,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)| |shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec'| |buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)| +|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| +|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| + +Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`. #### Rejection Policy diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index cc6b5130e3f..bb6359ea00c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -157,6 +157,7 @@ public abstract class AbstractTask implements Task * Start helper methods * * @param objects objects to join + * * @return string of joined objects */ public static String joinId(Object... objects) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a326c31f9c0..f237f0f26e0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -141,7 +141,9 @@ public class IndexTask extends AbstractFixedIntervalTask null, shardSpec, indexSpec, - buildV9Directly + buildV9Directly, + 0, + 0 ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 916eea0b7bc..63a1180b741 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -322,6 +322,14 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer command.add(String.format("-Ddruid.host=%s", childHost)); command.add(String.format("-Ddruid.port=%d", childPort)); + /** + * These are not enabled per default to allow the user to either set or not set them + * Users are highly suggested to be set in druid.indexer.runner.javaOpts + * See io.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) + * for more information + command.add("-XX:+UseThreadPriorities"); + command.add("-XX:ThreadPriorityPolicy=42"); + */ if (config.isSeparateIngestionEndpoint()) { command.add(String.format( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 9e8e2a15b05..19c7a141dcb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -36,6 +36,7 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.concurrent.Execs; +import io.druid.concurrent.TaskThreadPriority; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; @@ -52,8 +53,11 @@ import org.joda.time.Interval; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; @@ -66,7 +70,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final TaskToolboxFactory toolboxFactory; private final TaskConfig taskConfig; - private final ListeningExecutorService exec; + private final ConcurrentMap exec = new ConcurrentHashMap<>(); private final Set runningItems = new ConcurrentSkipListSet<>(); private final ServiceEmitter emitter; @@ -79,7 +83,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); this.taskConfig = taskConfig; - this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d")); this.emitter = Preconditions.checkNotNull(emitter, "emitter"); } @@ -89,10 +92,27 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker return ImmutableList.of(); } + private static ListeningExecutorService buildExecutorService(int priority) + { + return MoreExecutors.listeningDecorator( + Execs.singleThreaded( + "task-runner-%d-priority-" + priority, + TaskThreadPriority.getThreadPriorityFromTaskPriority(priority) + ) + ); + } + @LifecycleStop public void stop() { - exec.shutdown(); + for (Map.Entry entry : exec.entrySet()) { + try { + entry.getValue().shutdown(); + } + catch (SecurityException ex) { + log.wtf(ex, "I can't control my own threads!"); + } + } for (ThreadPoolTaskRunnerWorkItem item : runningItems) { final Task task = item.getTask(); @@ -145,14 +165,44 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker } // Ok, now interrupt everything. - exec.shutdownNow(); + for (Map.Entry entry : exec.entrySet()) { + try { + entry.getValue().shutdownNow(); + } + catch (SecurityException ex) { + log.wtf(ex, "I can't control my own threads!"); + } + } } @Override public ListenableFuture run(final Task task) { final TaskToolbox toolbox = toolboxFactory.build(task); - final ListenableFuture statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox)); + final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY); + int taskPriority = 0; + if(taskPriorityObj != null){ + if(taskPriorityObj instanceof Number) { + taskPriority = ((Number) taskPriorityObj).intValue(); + } else if(taskPriorityObj instanceof String) { + try { + taskPriority = Integer.parseInt(taskPriorityObj.toString()); + } + catch (NumberFormatException e) { + log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId()); + } + } + } + // Ensure an executor for that priority exists + if (!exec.containsKey(taskPriority)) { + final ListeningExecutorService executorService = buildExecutorService(taskPriority); + if (exec.putIfAbsent(taskPriority, executorService) != null) { + // favor prior service + executorService.shutdownNow(); + } + } + final ListenableFuture statusFuture = exec.get(taskPriority) + .submit(new ThreadPoolTaskRunnerCallable(task, toolbox)); final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture); runningItems.add(taskRunnerWorkItem); Futures.addCallback( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9a9d9881966..af2cc7d83f3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -592,7 +592,8 @@ public class RealtimeIndexTaskTest null, null, null, - buildV9Directly + buildV9Directly, + 0, 0 ); return new RealtimeIndexTask( taskId, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index ffe1b0642f1..54825cf63cf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -333,7 +333,9 @@ public class TaskSerdeTest 1, new NoneShardSpec(), indexSpec, - null + null, + 0, + 0 ) ), null diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index b5857b0c454..da2efc3391f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1091,7 +1091,8 @@ public class TaskLifecycleTest null, null, null, - null + null, + 0, 0 ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); return new RealtimeIndexTask( diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index adb1c054a59..b2925a8e47e 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -61,7 +61,9 @@ public class RealtimeTuningConfig implements TuningConfig defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, - defaultBuildV9Directly + defaultBuildV9Directly, + 0, + 0 ); } @@ -75,6 +77,8 @@ public class RealtimeTuningConfig implements TuningConfig private final ShardSpec shardSpec; private final IndexSpec indexSpec; private final Boolean buildV9Directly; + private final int persistThreadPriority; + private final int mergeThreadPriority; @JsonCreator public RealtimeTuningConfig( @@ -87,7 +91,9 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("buildV9Directly") Boolean buildV9Directly + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("persistThreadPriority") int persistThreadPriority, + @JsonProperty("mergeThreadPriority") int mergeThreadPriority ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -104,6 +110,8 @@ public class RealtimeTuningConfig implements TuningConfig this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly; + this.mergeThreadPriority = mergeThreadPriority; + this.persistThreadPriority = persistThreadPriority; } @JsonProperty @@ -161,10 +169,22 @@ public class RealtimeTuningConfig implements TuningConfig } @JsonProperty - public Boolean getBuildV9Directly() { + public Boolean getBuildV9Directly() + { return buildV9Directly; } + public int getPersistThreadPriority() + { + return this.persistThreadPriority; + } + + @JsonProperty + public int getMergeThreadPriority() + { + return this.mergeThreadPriority; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -177,7 +197,9 @@ public class RealtimeTuningConfig implements TuningConfig maxPendingPersists, shardSpec, indexSpec, - buildV9Directly + buildV9Directly, + persistThreadPriority, + mergeThreadPriority ); } @@ -193,7 +215,9 @@ public class RealtimeTuningConfig implements TuningConfig maxPendingPersists, shardSpec, indexSpec, - buildV9Directly + buildV9Directly, + persistThreadPriority, + mergeThreadPriority ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 59cebb931de..2a4375e49db 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -48,6 +48,7 @@ import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.utils.VMUtils; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; +import io.druid.concurrent.TaskThreadPriority; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; @@ -663,13 +664,17 @@ public class RealtimePlumber implements Plumber if (persistExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow persistExecutor = Execs.newBlockingSingleThreaded( - "plumber_persist_%d", maxPendingPersists + "plumber_persist_%d", + maxPendingPersists, + TaskThreadPriority.getThreadPriorityFromTaskPriority(config.getPersistThreadPriority()) ); } if (mergeExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow mergeExecutor = Execs.newBlockingSingleThreaded( - "plumber_merge_%d", 1 + "plumber_merge_%d", + 1, + TaskThreadPriority.getThreadPriorityFromTaskPriority(config.getMergeThreadPriority()) ); } diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index c38cd02bf02..db2694a10df 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -117,7 +117,7 @@ public class FireDepartmentTest null ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, null, null + null, null, null, null, null, null, null, null, null, null, 0, 0 ) ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 5fc212061aa..c5884ebf6ac 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -151,7 +151,9 @@ public class RealtimeManagerTest null, null, null, - null + null, + 0, + 0 ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index ff14c0dddf5..297d3940d58 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -194,7 +194,8 @@ public class RealtimePlumberSchoolTest null, null, null, - buildV9Directly + buildV9Directly, + 0, 0 ); realtimePlumberSchool = new RealtimePlumberSchool( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 41024a2abb9..34c85923f90 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -67,7 +67,8 @@ public class SinkTest null, null, null, - null + null, + 0, 0 ); final Sink sink = new Sink(interval, schema, tuningConfig, version);