Update forbidden apis with fixed executor (#13633)

* Update forbidden apis with fixed executor
This commit is contained in:
Adarsh Sanjeev 2023-01-12 15:34:36 +05:30 committed by GitHub
parent afb3d91777
commit 0a486c3bcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 70 additions and 66 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@ -47,7 +48,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@ -77,7 +77,7 @@ public class BalancerStrategyBenchmark
private int reservoirSize = 1;
private double percentOfSegmentsToConsider = 100;
private final BalancerStrategy balancerStrategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalancerStrategyBenchmark-%d"))
);
@Setup(Level.Trial)

View File

@ -48,6 +48,7 @@ org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.Tempora
org.apache.commons.io.FileUtils#deleteDirectory(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils#deleteDirectory()
org.apache.commons.io.FileUtils#forceMkdir(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils.mkdirp instead
java.lang.Class#getCanonicalName() @ Class.getCanonicalName can return null for anonymous types, use Class.getName instead.
java.util.concurrent.Executors#newFixedThreadPool(int) @ Executor is non-daemon and can prevent JVM shutdown, use org.apache.druid.java.util.common.concurrent.Execs#multiThreaded(int, java.lang.String) instead.
@defaultMessage Use Locale.ENGLISH
com.ibm.icu.text.DateFormatSymbols#<init>()

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Test;
@ -31,7 +32,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -71,7 +71,7 @@ public class GuavaUtilsTest
public void testCancelAll()
{
int tasks = 3;
ExecutorService service = Executors.newFixedThreadPool(tasks);
ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d");
ListeningExecutorService exc = MoreExecutors.listeningDecorator(service);
AtomicInteger index = new AtomicInteger(0);
//a flag what time to throw exception.

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Test;
@ -33,7 +34,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -61,7 +61,10 @@ public class LifecycleTest
public void testConcurrentStartStopOnce() throws Exception
{
final int numThreads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded(
numThreads,
"LifecycleTest-%d"
));
final Lifecycle lifecycle = new Lifecycle();
final AtomicLong handlerFailedCount = new AtomicLong(0L);

View File

@ -28,6 +28,7 @@ import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@ -53,7 +54,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -158,7 +158,7 @@ public class HdfsClasspathSetupTest
public void testConcurrentUpload() throws InterruptedException, ExecutionException, TimeoutException
{
final int concurrency = 10;
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency));
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Execs.multiThreaded(concurrency, "HdfsClasspathSetupTest-%d"));
// barrier ensures that all jobs try to add files to classpath at same time.
final CyclicBarrier barrier = new CyclicBarrier(concurrency);
final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());

View File

@ -97,6 +97,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -176,7 +177,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RunWith(Parameterized.class)
public class TaskLifecycleTest extends InitializedNullHandlingTest
@ -1364,7 +1364,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
@Test
public void testUnifiedAppenderatorsManagerCleanup() throws Exception
{
final ExecutorService exec = Executors.newFixedThreadPool(8);
final ExecutorService exec = Execs.multiThreaded(8, "TaskLifecycleTest-%d");
UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
new ForwardingQueryProcessingPool(exec),

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
@ -45,7 +46,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TaskLockBoxConcurrencyTest
@ -77,7 +77,7 @@ public class TaskLockBoxConcurrencyTest
taskStorage,
new IndexerSQLMetadataStorageCoordinator(objectMapper, derby.metadataTablesConfigSupplier().get(), derbyConnector)
);
service = Executors.newFixedThreadPool(2);
service = Execs.multiThreaded(2, "TaskLockBoxConcurrencyTest-%d");
}
@After

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@ -50,7 +51,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -133,7 +133,7 @@ public class ChainedExecutionQueryRunnerTest
.build();
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
Future resultFuture = Executors.newFixedThreadPool(1).submit(
Future resultFuture = Execs.multiThreaded(1, "ChainedExecutionQueryRunnerTest-%d").submit(
new Runnable()
{
@Override
@ -258,7 +258,7 @@ public class ChainedExecutionQueryRunnerTest
.build();
final Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
Future resultFuture = Executors.newFixedThreadPool(1).submit(
Future resultFuture = Execs.multiThreaded(1, "ChainedExecutionQueryRunnerTest-%d").submit(
new Runnable()
{
@Override

View File

@ -20,7 +20,9 @@
package org.apache.druid.query.groupby.having;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -39,7 +41,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class DimFilterHavingSpecTest
@ -85,7 +86,7 @@ public class DimFilterHavingSpecTest
@Ignore // Doesn't always pass. The check in "eval" is best effort and not guaranteed to detect concurrent usage.
public void testConcurrentUsage() throws Exception
{
final ExecutorService exec = Executors.newFixedThreadPool(2);
final ExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(2, "DimFilterHavingSpecTest-%d"));
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null), null);
final List<Future<?>> futures = new ArrayList<>();

View File

@ -35,6 +35,7 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
@ -639,7 +640,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
ExecutorService executor = Execs.multiThreaded(threadCount, "IncrementalIndexTest-%d");
final long timestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(threadCount);
for (int j = 0; j < threadCount; j++) {

View File

@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.BoundDimFilter;
@ -65,7 +66,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
@ -418,7 +418,7 @@ public class FloatAndDoubleFilteringTest extends BaseFilterTest
)
{
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS)
Execs.multiThreaded(EXECUTOR_NUM_THREADS, "FloatAndDoubleFilteringTest-%d")
);
List<ListenableFuture<?>> futures = new ArrayList<>();

View File

@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.extraction.MapLookupExtractor;
@ -63,7 +64,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class)
@ -433,7 +433,7 @@ public class LongFilteringTest extends BaseFilterTest
final List<String> expectedRows
)
{
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(EXECUTOR_NUM_THREADS));
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(EXECUTOR_NUM_THREADS, "LongFilteringTest-%d"));
List<ListenableFuture<?>> futures = new ArrayList<>();

View File

@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -79,7 +80,6 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
@ -201,7 +201,9 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
final AtomicReference<String> failureReason = new AtomicReference<>(expectedReason);
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {

View File

@ -72,7 +72,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -426,7 +425,7 @@ public class BatchServerInventoryViewTest
public void testSameTimeZnode() throws Exception
{
final int numThreads = INITIAL_SEGMENTS / 10;
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, "BatchServerInventoryViewTest-%d"));
segmentAnnouncer.announceSegments(testSegments);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
@ -54,7 +55,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@ -205,7 +205,7 @@ public class SegmentManagerTest
public void setup()
{
segmentManager = new SegmentManager(SEGMENT_LOADER);
executor = Executors.newFixedThreadPool(SEGMENTS.size());
executor = Execs.multiThreaded(SEGMENTS.size(), "SegmentManagerTest-%d");
}
@After

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
@ -119,7 +120,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -147,7 +147,7 @@ public class ServerManagerTest
queryWaitYieldLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch);
serverManagerExec = Executors.newFixedThreadPool(2);
serverManagerExec = Execs.multiThreaded(2, "ServerManagerTest-%d");
segmentManager = new SegmentManager(
new SegmentLoader()
{

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@ -44,7 +45,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -161,7 +161,7 @@ public class BalanceSegmentsTest
druidServers = ImmutableList.of(druidServer1, druidServer2, druidServer3, druidServer4);
peons = ImmutableList.of(peon1, peon2, peon3, peon4);
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalanceSegmentsTest-%d"));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
broadcastDatasources = Collections.singleton("datasourceBroadcast");

View File

@ -23,6 +23,7 @@ import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.AfterClass;
@ -34,7 +35,6 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
@Ignore
@RunWith(Parameterized.class)
@ -46,11 +46,11 @@ public class CostBalancerStrategyBenchmark extends AbstractBenchmark
return Arrays.asList(
new CostBalancerStrategy[] {
new CostBalancerStrategy(MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1)))
Execs.multiThreaded(1, "CostBalancerStrategyBenchmark-%d")))
},
new CostBalancerStrategy[] {
new CostBalancerStrategy(MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(4)))
Execs.multiThreaded(4, "CostBalancerStrategyBenchmark-%d")))
}
);
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@ -42,7 +43,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -138,7 +138,7 @@ public class CostBalancerStrategyTest
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "CostBalancerStrategyTest-%d"))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
@ -152,7 +152,7 @@ public class CostBalancerStrategyTest
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "CostBalancerStrategyTest-%d"))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
@ -163,9 +163,6 @@ public class CostBalancerStrategyTest
public void testComputeJointSegmentCost()
{
DateTime referenceTime = DateTimes.of("2014-01-01T00:00:00");
CostBalancerStrategy strategy = new CostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
);
double segmentCost = CostBalancerStrategy.computeJointSegmentsCost(
getSegment(
100,

View File

@ -25,6 +25,7 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@ -37,7 +38,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -132,7 +132,7 @@ public class DiskNormalizedCostBalancerStrategyTest
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "DiskNormalizedCostBalancerStrategyTest-%d"))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
@ -146,7 +146,7 @@ public class DiskNormalizedCostBalancerStrategyTest
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "DiskNormalizedCostBalancerStrategyTest-%d"))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -53,7 +54,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
/**
*/
@ -136,7 +136,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
@ -207,7 +207,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
@ -283,7 +283,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
@ -388,7 +388,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build();
@ -452,7 +452,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
@ -510,7 +510,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
@ -604,7 +604,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
@ -660,7 +660,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
@ -720,7 +720,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
@ -772,7 +772,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
@ -838,7 +838,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
@ -903,7 +903,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build();
@ -1015,7 +1015,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build();
@ -1087,7 +1087,7 @@ public class RunRulesTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
@ -1166,7 +1166,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
@ -1426,7 +1426,7 @@ public class RunRulesTest
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))

View File

@ -31,6 +31,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.LoggingEmitter;
@ -72,7 +73,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -121,7 +121,7 @@ public class LoadRuleTest
EMITTER.start();
throttler = EasyMock.createMock(ReplicationThrottler.class);
exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d"));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);

View File

@ -34,6 +34,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit;
@ -285,7 +286,7 @@ public class JettyTest extends BaseJettyTest
public void testTimeouts() throws Exception
{
// test for request timeouts properly not locking up all threads
final ExecutorService executor = Executors.newFixedThreadPool(100);
final ExecutorService executor = Execs.multiThreaded(100, "JettyTest-%d");
final AtomicLong count = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 10000; i++) {

View File

@ -122,7 +122,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -820,7 +819,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
{
final List<ListenableFuture<Integer>> futures = new ArrayList<>();
final ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(AVATICA_CONFIG.getMaxStatementsPerConnection())
Execs.multiThreaded(AVATICA_CONFIG.getMaxStatementsPerConnection(), "DruidAvaticaHandlerTest-%d")
);
for (int i = 0; i < 2000; i++) {
final String query = StringUtils.format("SELECT COUNT(*) + %s AS ci FROM foo", i);