From 5eadd395e2c64243791bdb781f494e0baef01215 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 14 Jul 2015 15:32:17 -0700 Subject: [PATCH] Move lots of executor service creation to Execs --- .../aws/FileSessionCredentialsProvider.java | 6 +-- .../io/druid/indexing/overlord/TaskQueue.java | 4 +- .../query/PrioritizedExecutorService.java | 44 ++++++++++++++----- .../druid/client/CachingClusteredClient.java | 8 +--- .../io/druid/guice/DruidProcessingModule.java | 5 ++- .../firehose/TimedShutoffFirehoseFactory.java | 10 +---- .../realtime/plumber/FlushingPlumber.java | 11 +---- .../realtime/plumber/RealtimePlumber.java | 16 +++---- .../server/bridge/DruidClusterBridge.java | 15 +++---- .../coordination/BaseZkCoordinator.java | 10 +---- 10 files changed, 59 insertions(+), 70 deletions(-) diff --git a/aws-common/src/main/java/io/druid/common/aws/FileSessionCredentialsProvider.java b/aws-common/src/main/java/io/druid/common/aws/FileSessionCredentialsProvider.java index 5ffbc0ea4b5..405569e5a64 100644 --- a/aws-common/src/main/java/io/druid/common/aws/FileSessionCredentialsProvider.java +++ b/aws-common/src/main/java/io/druid/common/aws/FileSessionCredentialsProvider.java @@ -21,6 +21,7 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSSessionCredentials; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.druid.concurrent.Execs; import java.io.File; import java.io.FileInputStream; @@ -37,10 +38,7 @@ public class FileSessionCredentialsProvider implements AWSCredentialsProvider { private volatile String accessKey; private volatile String secretKey; - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("FileSessionCredentialsProviderRefresh-%d") - .setDaemon(true).build() - ); + private final ScheduledExecutorService scheduler = Execs.scheduledSingleThreaded("FileSessionCredentialsProviderRefresh-%d"); public FileSessionCredentialsProvider(String sessionCredentials) { this.sessionCredentials = sessionCredentials; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 61b60644cfe..51e89a70435 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -79,12 +79,12 @@ public class TaskQueue private final Condition managementMayBeNecessary = giant.newCondition(); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() - .setDaemon(false) + .setDaemon(false) // Don't use Execs because of this .setNameFormat("TaskQueue-Manager").build() ); private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() - .setDaemon(false) + .setDaemon(false) // Don't use Execs because of this .setNameFormat("TaskQueue-StorageSync").build() ); diff --git a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java index 1bc0797e451..e2cb9ef7480 100644 --- a/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java +++ b/processing/src/main/java/io/druid/query/PrioritizedExecutorService.java @@ -98,17 +98,28 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen this.defaultPriority = defaultPriority; } - @Override protected PrioritizedListenableFutureTask newTaskFor(Runnable runnable, T value) { - Preconditions.checkArgument(allowRegularTasks || runnable instanceof PrioritizedRunnable, "task does not implement PrioritizedRunnable"); - return PrioritizedListenableFutureTask.create(ListenableFutureTask.create(runnable, value), - runnable instanceof PrioritizedRunnable - ? ((PrioritizedRunnable) runnable).getPriority() - : defaultPriority + @Override + protected PrioritizedListenableFutureTask newTaskFor(Runnable runnable, T value) + { + Preconditions.checkArgument( + allowRegularTasks || runnable instanceof PrioritizedRunnable, + "task does not implement PrioritizedRunnable" + ); + return PrioritizedListenableFutureTask.create( + ListenableFutureTask.create(runnable, value), + runnable instanceof PrioritizedRunnable + ? ((PrioritizedRunnable) runnable).getPriority() + : defaultPriority ); } - @Override protected PrioritizedListenableFutureTask newTaskFor(Callable callable) { - Preconditions.checkArgument(allowRegularTasks || callable instanceof PrioritizedCallable, "task does not implement PrioritizedCallable"); + @Override + protected PrioritizedListenableFutureTask newTaskFor(Callable callable) + { + Preconditions.checkArgument( + allowRegularTasks || callable instanceof PrioritizedCallable, + "task does not implement PrioritizedCallable" + ); return PrioritizedListenableFutureTask.create( ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable ? ((PrioritizedCallable) callable).getPriority() @@ -116,15 +127,21 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen ); } - @Override public ListenableFuture submit(Runnable task) { + @Override + public ListenableFuture submit(Runnable task) + { return (ListenableFuture) super.submit(task); } - @Override public ListenableFuture submit(Runnable task, @Nullable T result) { + @Override + public ListenableFuture submit(Runnable task, @Nullable T result) + { return (ListenableFuture) super.submit(task, result); } - @Override public ListenableFuture submit(Callable task) { + @Override + public ListenableFuture submit(Callable task) + { return (ListenableFuture) super.submit(task); } @@ -170,7 +187,10 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen } - public static class PrioritizedListenableFutureTask implements RunnableFuture, ListenableFuture, PrioritizedRunnable, Comparable + public static class PrioritizedListenableFutureTask implements RunnableFuture, + ListenableFuture, + PrioritizedRunnable, + Comparable { public static PrioritizedListenableFutureTask create(PrioritizedRunnable task, @Nullable V result) { diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 47873499e26..9ed71c27cec 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -34,7 +34,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; @@ -43,11 +42,11 @@ import com.metamx.common.guava.LazySequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; +import io.druid.concurrent.Execs; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentResultValueClass; @@ -78,7 +77,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** */ @@ -110,9 +108,7 @@ public class CachingClusteredClient implements QueryRunner this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); serverView.registerSegmentCallback( - Executors.newFixedThreadPool( - 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CCClient-ServerView-CB-%d").build() - ), + Execs.singleThreaded("CCClient-ServerView-CB-%d"), new ServerView.BaseSegmentCallback() { @Override diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 1e2065ae808..14b9704e1b9 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -69,7 +69,7 @@ public class DruidProcessingModule implements Module cacheConfig.getNumBackgroundThreads(), new ThreadFactoryBuilder() .setNameFormat("background-cacher-%d") - .setDaemon(true) + .setDaemon(true) // TODO: migrate this to Execs after https://github.com/druid-io/druid/pull/984 .setPriority(Thread.MIN_PRIORITY) .build() ); @@ -118,7 +118,8 @@ public class DruidProcessingModule implements Module ) ); } - } catch(UnsupportedOperationException e) { + } + catch (UnsupportedOperationException e) { log.info(e.getMessage()); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 5b68fe76c39..65512dcfaf5 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -19,8 +19,8 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -28,7 +28,6 @@ import io.druid.data.input.impl.InputRowParser; import org.joda.time.DateTime; import java.io.IOException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -68,12 +67,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index d29b1712dd8..40e14f67979 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; @@ -52,7 +51,6 @@ import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -118,12 +116,7 @@ public class DruidClusterBridge this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.self = self; - ExecutorService serverInventoryViewExec = Executors.newFixedThreadPool( - 1, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("DruidClusterBridge-ServerInventoryView-%d") - .build() - ); + ExecutorService serverInventoryViewExec = Execs.singleThreaded("DruidClusterBridge-ServerInventoryView-%d"); serverInventoryView.registerSegmentCallback( serverInventoryViewExec, @@ -376,7 +369,11 @@ public class DruidClusterBridge } } - private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServerMetadata server) + private void serverRemovedSegment( + DataSegmentAnnouncer dataSegmentAnnouncer, + DataSegment segment, + DruidServerMetadata server + ) throws IOException { Integer count = segments.get(segment); diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index c4c3f9e1788..c1faea8a165 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -19,12 +19,10 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; @@ -35,7 +33,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import java.io.IOException; -import java.util.concurrent.Executors; /** */ @@ -88,10 +85,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler loadQueueLocation, true, true, - Executors.newFixedThreadPool( - config.getNumLoadingThreads(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build() - ) + Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s") ); try {