Move lots of executor service creation to Execs

This commit is contained in:
Charles Allen 2015-07-14 15:32:17 -07:00
parent 798c3320d0
commit 5eadd395e2
10 changed files with 59 additions and 70 deletions

View File

@ -21,6 +21,7 @@ import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.auth.AWSSessionCredentials;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.druid.concurrent.Execs;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -37,10 +38,7 @@ public class FileSessionCredentialsProvider implements AWSCredentialsProvider {
private volatile String accessKey; private volatile String accessKey;
private volatile String secretKey; private volatile String secretKey;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( private final ScheduledExecutorService scheduler = Execs.scheduledSingleThreaded("FileSessionCredentialsProviderRefresh-%d");
new ThreadFactoryBuilder().setNameFormat("FileSessionCredentialsProviderRefresh-%d")
.setDaemon(true).build()
);
public FileSessionCredentialsProvider(String sessionCredentials) { public FileSessionCredentialsProvider(String sessionCredentials) {
this.sessionCredentials = sessionCredentials; this.sessionCredentials = sessionCredentials;

View File

@ -79,12 +79,12 @@ public class TaskQueue
private final Condition managementMayBeNecessary = giant.newCondition(); private final Condition managementMayBeNecessary = giant.newCondition();
private final ExecutorService managerExec = Executors.newSingleThreadExecutor( private final ExecutorService managerExec = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setDaemon(false) .setDaemon(false) // Don't use Execs because of this
.setNameFormat("TaskQueue-Manager").build() .setNameFormat("TaskQueue-Manager").build()
); );
private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor( private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setDaemon(false) .setDaemon(false) // Don't use Execs because of this
.setNameFormat("TaskQueue-StorageSync").build() .setNameFormat("TaskQueue-StorageSync").build()
); );

View File

@ -98,17 +98,28 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
this.defaultPriority = defaultPriority; this.defaultPriority = defaultPriority;
} }
@Override protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Runnable runnable, T value) { @Override
Preconditions.checkArgument(allowRegularTasks || runnable instanceof PrioritizedRunnable, "task does not implement PrioritizedRunnable"); protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Runnable runnable, T value)
return PrioritizedListenableFutureTask.create(ListenableFutureTask.create(runnable, value), {
Preconditions.checkArgument(
allowRegularTasks || runnable instanceof PrioritizedRunnable,
"task does not implement PrioritizedRunnable"
);
return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(runnable, value),
runnable instanceof PrioritizedRunnable runnable instanceof PrioritizedRunnable
? ((PrioritizedRunnable) runnable).getPriority() ? ((PrioritizedRunnable) runnable).getPriority()
: defaultPriority : defaultPriority
); );
} }
@Override protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Callable<T> callable) { @Override
Preconditions.checkArgument(allowRegularTasks || callable instanceof PrioritizedCallable, "task does not implement PrioritizedCallable"); protected <T> PrioritizedListenableFutureTask<T> newTaskFor(Callable<T> callable)
{
Preconditions.checkArgument(
allowRegularTasks || callable instanceof PrioritizedCallable,
"task does not implement PrioritizedCallable"
);
return PrioritizedListenableFutureTask.create( return PrioritizedListenableFutureTask.create(
ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable ListenableFutureTask.create(callable), callable instanceof PrioritizedCallable
? ((PrioritizedCallable) callable).getPriority() ? ((PrioritizedCallable) callable).getPriority()
@ -116,15 +127,21 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
); );
} }
@Override public ListenableFuture<?> submit(Runnable task) { @Override
public ListenableFuture<?> submit(Runnable task)
{
return (ListenableFuture<?>) super.submit(task); return (ListenableFuture<?>) super.submit(task);
} }
@Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) { @Override
public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result)
{
return (ListenableFuture<T>) super.submit(task, result); return (ListenableFuture<T>) super.submit(task, result);
} }
@Override public <T> ListenableFuture<T> submit(Callable<T> task) { @Override
public <T> ListenableFuture<T> submit(Callable<T> task)
{
return (ListenableFuture<T>) super.submit(task); return (ListenableFuture<T>) super.submit(task);
} }
@ -170,7 +187,10 @@ public class PrioritizedExecutorService extends AbstractExecutorService implemen
} }
public static class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>, ListenableFuture<V>, PrioritizedRunnable, Comparable<PrioritizedListenableFutureTask> public static class PrioritizedListenableFutureTask<V> implements RunnableFuture<V>,
ListenableFuture<V>,
PrioritizedRunnable,
Comparable<PrioritizedListenableFutureTask>
{ {
public static <V> PrioritizedListenableFutureTask<V> create(PrioritizedRunnable task, @Nullable V result) public static <V> PrioritizedListenableFutureTask<V> create(PrioritizedRunnable task, @Nullable V result)
{ {

View File

@ -34,7 +34,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
@ -43,11 +42,11 @@ import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector; import io.druid.client.selector.ServerSelector;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
@ -78,7 +77,6 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
*/ */
@ -110,9 +108,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
Executors.newFixedThreadPool( Execs.singleThreaded("CCClient-ServerView-CB-%d"),
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CCClient-ServerView-CB-%d").build()
),
new ServerView.BaseSegmentCallback() new ServerView.BaseSegmentCallback()
{ {
@Override @Override

View File

@ -69,7 +69,7 @@ public class DruidProcessingModule implements Module
cacheConfig.getNumBackgroundThreads(), cacheConfig.getNumBackgroundThreads(),
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("background-cacher-%d") .setNameFormat("background-cacher-%d")
.setDaemon(true) .setDaemon(true) // TODO: migrate this to Execs after https://github.com/druid-io/druid/pull/984
.setPriority(Thread.MIN_PRIORITY) .setPriority(Thread.MIN_PRIORITY)
.build() .build()
); );
@ -118,7 +118,8 @@ public class DruidProcessingModule implements Module
) )
); );
} }
} catch(UnsupportedOperationException e) { }
catch (UnsupportedOperationException e) {
log.info(e.getMessage()); log.info(e.getMessage());
} }

View File

@ -19,8 +19,8 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -28,7 +28,6 @@ import io.druid.data.input.impl.InputRowParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -68,12 +67,7 @@ public class TimedShutoffFirehoseFactory implements FirehoseFactory<InputRowPars
{ {
firehose = delegateFactory.connect(parser); firehose = delegateFactory.connect(parser);
exec = Executors.newScheduledThreadPool( exec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d");
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("timed-shutoff-firehose-%d")
.build()
);
exec.schedule( exec.schedule(
new Runnable() new Runnable()

View File

@ -18,12 +18,12 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -36,7 +36,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
/** /**
@ -90,13 +89,7 @@ public class FlushingPlumber extends RealtimePlumber
initializeExecutors(); initializeExecutors();
if (flushScheduledExec == null) { if (flushScheduledExec == null) {
flushScheduledExec = Executors.newScheduledThreadPool( flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d");
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("flushing_scheduled_%d")
.build()
);
} }
bootstrapSinksFromDisk(); bootstrapSinksFromDisk();

View File

@ -27,7 +27,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -88,7 +87,6 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -553,13 +551,7 @@ public class RealtimePlumber implements Plumber
} }
if (scheduledExecutor == null) { if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool( scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_scheduled_%d")
.build()
);
} }
} }
@ -741,7 +733,11 @@ public class RealtimePlumber implements Plumber
); );
long minTimestamp = minTimestampAsDate.getMillis(); long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] segments. Attempting to hand off segments that start before [%s].", sinks.size(), minTimestampAsDate); log.info(
"Found [%,d] segments. Attempting to hand off segments that start before [%s].",
sinks.size(),
minTimestampAsDate
);
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList(); List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) { for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -52,7 +51,6 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -118,12 +116,7 @@ public class DruidClusterBridge
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self; this.self = self;
ExecutorService serverInventoryViewExec = Executors.newFixedThreadPool( ExecutorService serverInventoryViewExec = Execs.singleThreaded("DruidClusterBridge-ServerInventoryView-%d");
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DruidClusterBridge-ServerInventoryView-%d")
.build()
);
serverInventoryView.registerSegmentCallback( serverInventoryView.registerSegmentCallback(
serverInventoryViewExec, serverInventoryViewExec,
@ -376,7 +369,11 @@ public class DruidClusterBridge
} }
} }
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServerMetadata server) private void serverRemovedSegment(
DataSegmentAnnouncer dataSegmentAnnouncer,
DataSegment segment,
DruidServerMetadata server
)
throws IOException throws IOException
{ {
Integer count = segments.get(segment); Integer count = segments.get(segment);

View File

@ -19,12 +19,10 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -35,7 +33,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors;
/** /**
*/ */
@ -88,10 +85,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
loadQueueLocation, loadQueueLocation,
true, true,
true, true,
Executors.newFixedThreadPool( Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s")
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
); );
try { try {