diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java index b4d38e5d2cc..af6007c8ac8 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -47,7 +48,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Random; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -77,7 +77,7 @@ public class BalancerStrategyBenchmark private int reservoirSize = 1; private double percentOfSegmentsToConsider = 100; private final BalancerStrategy balancerStrategy = new CostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalancerStrategyBenchmark-%d")) ); @Setup(Level.Trial) diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index 04d1ceb41bd..a99654f1212 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -48,6 +48,7 @@ org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.Tempora org.apache.commons.io.FileUtils#deleteDirectory(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils#deleteDirectory() org.apache.commons.io.FileUtils#forceMkdir(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils.mkdirp instead java.lang.Class#getCanonicalName() @ Class.getCanonicalName can return null for anonymous types, use Class.getName instead. +java.util.concurrent.Executors#newFixedThreadPool(int) @ Executor is non-daemon and can prevent JVM shutdown, use org.apache.druid.java.util.common.concurrent.Execs#multiThreaded(int, java.lang.String) instead. @defaultMessage Use Locale.ENGLISH com.ibm.icu.text.DateFormatSymbols#() diff --git a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java index 27bebbfa4e6..3fcfb72cfd4 100644 --- a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.java.util.common.concurrent.Execs; import org.junit.Assert; import org.junit.Test; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -71,7 +71,7 @@ public class GuavaUtilsTest public void testCancelAll() { int tasks = 3; - ExecutorService service = Executors.newFixedThreadPool(tasks); + ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d"); ListeningExecutorService exc = MoreExecutors.listeningDecorator(service); AtomicInteger index = new AtomicInteger(0); //a flag what time to throw exception. diff --git a/core/src/test/java/org/apache/druid/java/util/common/lifecycle/LifecycleTest.java b/core/src/test/java/org/apache/druid/java/util/common/lifecycle/LifecycleTest.java index db6aee1f0e0..dbd0f4ed2d7 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/lifecycle/LifecycleTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/lifecycle/LifecycleTest.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; import org.junit.Assert; import org.junit.Test; @@ -33,7 +34,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +61,10 @@ public class LifecycleTest public void testConcurrentStartStopOnce() throws Exception { final int numThreads = 10; - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded( + numThreads, + "LifecycleTest-%d" + )); final Lifecycle lifecycle = new Lifecycle(); final AtomicLong handlerFailedCount = new AtomicLong(0L); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java index d0a23a8da3c..f3d19f454af 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java @@ -28,6 +28,7 @@ import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -53,7 +54,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -158,7 +158,7 @@ public class HdfsClasspathSetupTest public void testConcurrentUpload() throws InterruptedException, ExecutionException, TimeoutException { final int concurrency = 10; - ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency)); + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Execs.multiThreaded(concurrency, "HdfsClasspathSetupTest-%d")); // barrier ensures that all jobs try to add files to classpath at same time. final CyclicBarrier barrier = new CyclicBarrier(concurrency); final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 3cabeb6c674..ea70ac5548d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -97,6 +97,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -176,7 +177,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; @RunWith(Parameterized.class) public class TaskLifecycleTest extends InitializedNullHandlingTest @@ -1364,7 +1364,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest @Test public void testUnifiedAppenderatorsManagerCleanup() throws Exception { - final ExecutorService exec = Executors.newFixedThreadPool(8); + final ExecutorService exec = Execs.multiThreaded(8, "TaskLifecycleTest-%d"); UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager( new ForwardingQueryProcessingPool(exec), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 761dee61efd..9c198f42917 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; @@ -45,7 +46,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TaskLockBoxConcurrencyTest @@ -77,7 +77,7 @@ public class TaskLockBoxConcurrencyTest taskStorage, new IndexerSQLMetadataStorageCoordinator(objectMapper, derby.metadataTablesConfigSupplier().get(), derbyConnector) ); - service = Executors.newFixedThreadPool(2); + service = Execs.multiThreaded(2, "TaskLockBoxConcurrencyTest-%d"); } @After diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java index b1b5084a70c..8c2cc8573c7 100644 --- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.lifecycle.Lifecycle; @@ -50,7 +51,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -133,7 +133,7 @@ public class ChainedExecutionQueryRunnerTest .build(); final Sequence seq = chainedRunner.run(QueryPlus.wrap(query)); - Future resultFuture = Executors.newFixedThreadPool(1).submit( + Future resultFuture = Execs.multiThreaded(1, "ChainedExecutionQueryRunnerTest-%d").submit( new Runnable() { @Override @@ -258,7 +258,7 @@ public class ChainedExecutionQueryRunnerTest .build(); final Sequence seq = chainedRunner.run(QueryPlus.wrap(query)); - Future resultFuture = Executors.newFixedThreadPool(1).submit( + Future resultFuture = Execs.multiThreaded(1, "ChainedExecutionQueryRunnerTest-%d").submit( new Runnable() { @Override diff --git a/processing/src/test/java/org/apache/druid/query/groupby/having/DimFilterHavingSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/having/DimFilterHavingSpecTest.java index 93bc3e64122..d54b3e94e68 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/having/DimFilterHavingSpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/having/DimFilterHavingSpecTest.java @@ -20,7 +20,9 @@ package org.apache.druid.query.groupby.having; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.SelectorDimFilter; @@ -39,7 +41,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; public class DimFilterHavingSpecTest @@ -85,7 +86,7 @@ public class DimFilterHavingSpecTest @Ignore // Doesn't always pass. The check in "eval" is best effort and not guaranteed to detect concurrent usage. public void testConcurrentUsage() throws Exception { - final ExecutorService exec = Executors.newFixedThreadPool(2); + final ExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(2, "DimFilterHavingSpecTest-%d")); final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null); final List> futures = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java index 77309ff2777..9835284050d 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java @@ -35,6 +35,7 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; @@ -639,7 +640,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest final int threadCount = 10; final int elementsPerThread = 200; final int dimensionCount = 5; - ExecutorService executor = Executors.newFixedThreadPool(threadCount); + ExecutorService executor = Execs.multiThreaded(threadCount, "IncrementalIndexTest-%d"); final long timestamp = System.currentTimeMillis(); final CountDownLatch latch = new CountDownLatch(threadCount); for (int j = 0; j < threadCount; j++) { diff --git a/processing/src/test/java/org/apache/druid/segment/filter/FloatAndDoubleFilteringTest.java b/processing/src/test/java/org/apache/druid/segment/filter/FloatAndDoubleFilteringTest.java index f8d24013e51..f61ed0f14c0 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/FloatAndDoubleFilteringTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/FloatAndDoubleFilteringTest.java @@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.filter.BoundDimFilter; @@ -65,7 +66,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @RunWith(Parameterized.class) @@ -418,7 +418,7 @@ public class FloatAndDoubleFilteringTest extends BaseFilterTest ) { ListeningExecutorService executor = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS) + Execs.multiThreaded(EXECUTOR_NUM_THREADS, "FloatAndDoubleFilteringTest-%d") ); List> futures = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/LongFilteringTest.java b/processing/src/test/java/org/apache/druid/segment/filter/LongFilteringTest.java index 35f7cab997c..d24c599eceb 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/LongFilteringTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/LongFilteringTest.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.extraction.MapLookupExtractor; @@ -63,7 +64,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @RunWith(Parameterized.class) @@ -433,7 +433,7 @@ public class LongFilteringTest extends BaseFilterTest final List expectedRows ) { - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS)); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(EXECUTOR_NUM_THREADS, "LongFilteringTest-%d")); List> futures = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java index 251496ee395..a0ad1c6ae28 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; @@ -79,7 +80,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest @@ -201,7 +201,9 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest final AtomicReference failureReason = new AtomicReference<>(expectedReason); final int threads = 10; - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads)); + ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d") + ); Collection> futures = new ArrayList<>(threads); final CountDownLatch threadsStartLatch = new CountDownLatch(1); for (int i = 0; i < threads; ++i) { diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java index 25ac1ba34bc..330fe98332d 100644 --- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java @@ -72,7 +72,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -426,7 +425,7 @@ public class BatchServerInventoryViewTest public void testSameTimeZnode() throws Exception { final int numThreads = INITIAL_SEGMENTS / 10; - final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, "BatchServerInventoryViewTest-%d")); segmentAnnouncer.announceSegments(testSegments); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 620dbd26eba..65afb8ea7ab 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.query.TableDataSource; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -54,7 +55,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -205,7 +205,7 @@ public class SegmentManagerTest public void setup() { segmentManager = new SegmentManager(SEGMENT_LOADER); - executor = Executors.newFixedThreadPool(SEGMENTS.size()); + executor = Execs.multiThreaded(SEGMENTS.size(), "SegmentManagerTest-%d"); } @After diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 433759ba825..9b9bf015efe 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; @@ -119,7 +120,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -147,7 +147,7 @@ public class ServerManagerTest queryWaitYieldLatch = new CountDownLatch(1); queryNotifyLatch = new CountDownLatch(1); factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch); - serverManagerExec = Executors.newFixedThreadPool(2); + serverManagerExec = Execs.multiThreaded(2, "ServerManagerTest-%d"); segmentManager = new SegmentManager( new SegmentLoader() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index e2bb7a816ba..740eb76d263 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -44,7 +45,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -161,7 +161,7 @@ public class BalanceSegmentsTest druidServers = ImmutableList.of(druidServer1, druidServer2, druidServer3, druidServer4); peons = ImmutableList.of(peon1, peon2, peon3, peon4); - balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + balancerStrategyExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalanceSegmentsTest-%d")); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor); broadcastDatasources = Collections.singleton("datasourceBroadcast"); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyBenchmark.java b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyBenchmark.java index 410ed9fe346..881698f7a28 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyBenchmark.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyBenchmark.java @@ -23,6 +23,7 @@ import com.carrotsearch.junitbenchmarks.AbstractBenchmark; import com.carrotsearch.junitbenchmarks.BenchmarkOptions; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; import org.junit.AfterClass; @@ -34,7 +35,6 @@ import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; @Ignore @RunWith(Parameterized.class) @@ -46,11 +46,11 @@ public class CostBalancerStrategyBenchmark extends AbstractBenchmark return Arrays.asList( new CostBalancerStrategy[] { new CostBalancerStrategy(MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1))) + Execs.multiThreaded(1, "CostBalancerStrategyBenchmark-%d"))) }, new CostBalancerStrategy[] { new CostBalancerStrategy(MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(4))) + Execs.multiThreaded(4, "CostBalancerStrategyBenchmark-%d"))) } ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java index 32146b91595..2e2d2a44f9a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java @@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -42,7 +43,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -138,7 +138,7 @@ public class CostBalancerStrategyTest DataSegment segment = getSegment(1000); BalancerStrategy strategy = new CostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "CostBalancerStrategyTest-%d")) ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); @@ -152,7 +152,7 @@ public class CostBalancerStrategyTest DataSegment segment = getSegment(1000); BalancerStrategy strategy = new CostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "CostBalancerStrategyTest-%d")) ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); @@ -163,9 +163,6 @@ public class CostBalancerStrategyTest public void testComputeJointSegmentCost() { DateTime referenceTime = DateTimes.of("2014-01-01T00:00:00"); - CostBalancerStrategy strategy = new CostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) - ); double segmentCost = CostBalancerStrategy.computeJointSegmentsCost( getSegment( 100, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java index 986ec98d2e3..9a73e58e12e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java @@ -25,6 +25,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -132,7 +132,7 @@ public class DiskNormalizedCostBalancerStrategyTest DataSegment segment = getSegment(1000); BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "DiskNormalizedCostBalancerStrategyTest-%d")) ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); @@ -146,7 +146,7 @@ public class DiskNormalizedCostBalancerStrategyTest DataSegment segment = getSegment(1000); BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d")) ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index eb3be4c8951..3256a0ad766 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -53,7 +54,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; /** */ @@ -136,7 +136,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) @@ -207,7 +207,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) @@ -283,7 +283,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) @@ -388,7 +388,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build(); @@ -452,7 +452,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) @@ -510,7 +510,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) @@ -604,7 +604,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) @@ -660,7 +660,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) @@ -720,7 +720,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) @@ -772,7 +772,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) @@ -838,7 +838,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers @@ -903,7 +903,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build(); @@ -1015,7 +1015,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build(); @@ -1087,7 +1087,7 @@ public class RunRulesTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers @@ -1166,7 +1166,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers @@ -1426,7 +1426,7 @@ public class RunRulesTest ) .build(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index b039834e7b3..db62444c6ed 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -31,6 +31,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.LoggingEmitter; @@ -72,7 +73,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** @@ -121,7 +121,7 @@ public class LoadRuleTest EMITTER.start(); throttler = EasyMock.createMock(ReplicationThrottler.class); - exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d")); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java index ff8ae569d26..a2fcea83434 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java @@ -34,6 +34,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; @@ -285,7 +286,7 @@ public class JettyTest extends BaseJettyTest public void testTimeouts() throws Exception { // test for request timeouts properly not locking up all threads - final ExecutorService executor = Executors.newFixedThreadPool(100); + final ExecutorService executor = Execs.multiThreaded(100, "JettyTest-%d"); final AtomicLong count = new AtomicLong(0); final CountDownLatch latch = new CountDownLatch(1000); for (int i = 0; i < 10000; i++) { diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 748ed20ce3c..deccbf497de 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -122,7 +122,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** @@ -820,7 +819,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase { final List> futures = new ArrayList<>(); final ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(AVATICA_CONFIG.getMaxStatementsPerConnection()) + Execs.multiThreaded(AVATICA_CONFIG.getMaxStatementsPerConnection(), "DruidAvaticaHandlerTest-%d") ); for (int i = 0; i < 2000; i++) { final String query = StringUtils.format("SELECT COUNT(*) + %s AS ci FROM foo", i);