Fix PathChildrenCache's ExecutorService leak (#3726)

* Fix PathChildrenCache's executorService leak in Announcer, CuratorInventoryManager and RemoteTaskRunner

* Use a single ExecutorService for all workerStatusPathChildrenCaches in RemoteTaskRunner
This commit is contained in:
Roman Leventov 2016-12-07 15:00:10 -06:00 committed by Gian Merlino
parent dc8f814acc
commit 70e83bea6d
8 changed files with 151 additions and 267 deletions

View File

@ -37,6 +37,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -134,7 +135,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final Duration shutdownTimeout;
private final IndexerZkConfig indexerZkConfig;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory;
private final ExecutorService workerStatusPathChildrenCacheExecutor;
private final PathChildrenCache workerPathCache;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
@ -181,7 +183,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
RemoteTaskRunnerConfig config,
IndexerZkConfig indexerZkConfig,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
@ -193,8 +195,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast
this.indexerZkConfig = indexerZkConfig;
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZkConfig.getAnnouncementsPath());
this.workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor();
this.workerStatusPathChildrenCacheFactory = pathChildrenCacheFactory
.withExecutorService(workerStatusPathChildrenCacheExecutor)
.withShutdownExecutorOnClose(false)
.build();
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
@ -337,10 +343,17 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
resourceManagement.stopManagement();
Closer closer = Closer.create();
for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close();
closer.register(zkWorker);
}
closer.register(workerPathCache);
try {
closer.close();
}
finally {
workerStatusPathChildrenCacheExecutor.shutdown();
}
workerPathCache.close();
if (runPendingTasksExec != null) {
runPendingTasksExec.shutdown();
@ -889,7 +902,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
cancelWorkerCleanup(worker.getHost());
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final PathChildrenCache statusCache = workerStatusPathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
final ZkWorker zkWorker = new ZkWorker(
worker,

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
@ -81,10 +81,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
remoteTaskRunnerConfig,
zkPaths,
curator,
new SimplePathChildrenCacheFactory
.Builder()
.withCompressed(true)
.build(),
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),

View File

@ -28,7 +28,7 @@ import com.google.common.base.Throwables;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
@ -120,7 +120,7 @@ public class RemoteTaskRunnerTestUtils
}, null, null, null, null, null
),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
new PathChildrenCacheFactory.Builder(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),

View File

@ -1,134 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* This class exists to ignore the shutdownNow() call that PathChildrenCache does on close() so that we can share the
* same executor amongst multiple caches...
*/
public class ShutdownNowIgnoringExecutorService implements ExecutorService
{
private final ExecutorService exec;
public ShutdownNowIgnoringExecutorService(
ExecutorService exec
)
{
this.exec = exec;
}
@Override
public void shutdown()
{
// Ignore!
}
@Override
public List<Runnable> shutdownNow()
{
// Ignore!
return ImmutableList.of();
}
@Override
public boolean isShutdown()
{
return exec.isShutdown();
}
@Override
public boolean isTerminated()
{
return exec.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
return exec.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task)
{
return exec.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
return exec.submit(task, result);
}
@Override
public Future<?> submit(Runnable task)
{
return exec.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
{
return exec.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit
) throws InterruptedException
{
return exec.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
{
return exec.invokeAny(tasks);
}
@Override
public <T> T invokeAny(
Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit
) throws InterruptedException, ExecutionException, TimeoutException
{
return exec.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command)
{
exec.execute(command);
}
}

View File

@ -23,17 +23,14 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import io.druid.curator.ShutdownNowIgnoringExecutorService;
import com.google.common.io.Closer;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
@ -64,6 +61,7 @@ public class Announcer
private final CuratorFramework curator;
private final PathChildrenCacheFactory factory;
private final ExecutorService pathChildrenCacheExecutor;
private final List<Announceable> toAnnounce = Lists.newArrayList();
private final List<Announceable> toUpdate = Lists.newArrayList();
@ -79,7 +77,13 @@ public class Announcer
)
{
this.curator = curator;
this.factory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
this.pathChildrenCacheExecutor = exec;
this.factory = new PathChildrenCacheFactory.Builder()
.withCacheData(false)
.withCompressed(true)
.withExecutorService(exec)
.withShutdownExecutorOnClose(false)
.build();
}
@LifecycleStart
@ -114,8 +118,15 @@ public class Announcer
started = false;
for (Map.Entry<String, PathChildrenCache> entry : listeners.entrySet()) {
CloseQuietly.close(entry.getValue());
Closer closer = Closer.create();
for (PathChildrenCache cache : listeners.values()) {
closer.register(cache);
}
try {
CloseQuietly.close(closer);
}
finally {
pathChildrenCacheExecutor.shutdown();
}
for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : announcements.entrySet()) {

View File

@ -21,10 +21,96 @@ package io.druid.curator.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.ThreadUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
*/
public interface PathChildrenCacheFactory
public class PathChildrenCacheFactory
{
public PathChildrenCache make(CuratorFramework curator, String path);
private final boolean cacheData;
private final boolean compressed;
private final ExecutorService exec;
private final boolean shutdownExecutorOnClose;
private PathChildrenCacheFactory(
boolean cacheData,
boolean compressed,
ExecutorService exec,
boolean shutdownExecutorOnClose
)
{
this.cacheData = cacheData;
this.compressed = compressed;
this.exec = exec;
this.shutdownExecutorOnClose = shutdownExecutorOnClose;
}
public PathChildrenCache make(CuratorFramework curator, String path)
{
return new PathChildrenCache(
curator,
path,
cacheData,
compressed,
new CloseableExecutorService(exec, shutdownExecutorOnClose)
);
}
public static class Builder
{
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
private boolean cacheData;
private boolean compressed;
private ExecutorService exec;
private boolean shutdownExecutorOnClose;
public Builder()
{
cacheData = true;
compressed = false;
exec = null;
shutdownExecutorOnClose = true;
}
public Builder withCacheData(boolean cacheData)
{
this.cacheData = cacheData;
return this;
}
public Builder withCompressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
public Builder withExecutorService(ExecutorService exec)
{
this.exec = exec;
return this;
}
public Builder withShutdownExecutorOnClose(boolean shutdownExecutorOnClose)
{
this.shutdownExecutorOnClose = shutdownExecutorOnClose;
return this;
}
public PathChildrenCacheFactory build()
{
ExecutorService exec = this.exec != null ? this.exec : createDefaultExecutor();
return new PathChildrenCacheFactory(cacheData, compressed, exec, shutdownExecutorOnClose);
}
public static ExecutorService createDefaultExecutor()
{
return Executors.newSingleThreadExecutor(defaultThreadFactory);
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.curator.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ThreadUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
*/
public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory
{
private final boolean cacheData;
private final boolean compressed;
private final ExecutorService exec;
public SimplePathChildrenCacheFactory(
boolean cacheData,
boolean compressed,
ExecutorService exec
)
{
this.cacheData = cacheData;
this.compressed = compressed;
this.exec = exec;
}
@Override
public PathChildrenCache make(CuratorFramework curator, String path)
{
return new PathChildrenCache(curator, path, cacheData, compressed, exec);
}
public static class Builder
{
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
private boolean cacheData;
private boolean compressed;
private ExecutorService exec;
public Builder()
{
cacheData = true;
compressed = false;
exec = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
public Builder withCacheData(boolean cacheData)
{
this.cacheData = cacheData;
return this;
}
public Builder withCompressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
public Builder withExecutorService(ExecutorService exec)
{
this.exec = exec;
return this;
}
public SimplePathChildrenCacheFactory build()
{
return new SimplePathChildrenCacheFactory(cacheData, compressed, exec);
}
}
}

View File

@ -21,17 +21,13 @@ package io.druid.curator.inventory;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import io.druid.curator.ShutdownNowIgnoringExecutorService;
import com.google.common.io.Closer;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -68,6 +64,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private final ConcurrentMap<String, ContainerHolder> containers;
private final Set<ContainerHolder> uninitializedInventory;
private final PathChildrenCacheFactory cacheFactory;
private final ExecutorService pathChildrenCacheExecutor;
private volatile PathChildrenCache childrenCache;
@ -85,10 +82,16 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
this.containers = new MapMaker().makeMap();
this.uninitializedInventory = Sets.newConcurrentHashSet();
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
//this is a workaround to solve curator's out-of-order events problem
//https://issues.apache.org/jira/browse/CURATOR-191
this.cacheFactory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec));
this.pathChildrenCacheExecutor = exec;
this.cacheFactory = new PathChildrenCacheFactory.Builder()
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
//this is a workaround to solve curator's out-of-order events problem
//https://issues.apache.org/jira/browse/CURATOR-191
.withCacheData(false)
.withCompressed(true)
.withExecutorService(pathChildrenCacheExecutor)
.withShutdownExecutorOnClose(false)
.build();
}
@LifecycleStart
@ -133,14 +136,15 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
childrenCache = null;
}
for (String containerKey : Lists.newArrayList(containers.keySet())) {
final ContainerHolder containerHolder = containers.remove(containerKey);
if (containerHolder == null) {
log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey);
} else {
// This close() call actually calls shutdownNow() on the executor registered with the Cache object...
containerHolder.getCache().close();
}
Closer closer = Closer.create();
for (ContainerHolder containerHolder : containers.values()) {
closer.register(containerHolder.getCache());
}
try {
closer.close();
}
finally {
pathChildrenCacheExecutor.shutdown();
}
}