From 70e83bea6d4f9a4b97bc0c6797e1e20aac168ce4 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Wed, 7 Dec 2016 15:00:10 -0600 Subject: [PATCH] Fix PathChildrenCache's ExecutorService leak (#3726) * Fix PathChildrenCache's executorService leak in Announcer, CuratorInventoryManager and RemoteTaskRunner * Use a single ExecutorService for all workerStatusPathChildrenCaches in RemoteTaskRunner --- .../indexing/overlord/RemoteTaskRunner.java | 27 +++- .../overlord/RemoteTaskRunnerFactory.java | 7 +- .../overlord/RemoteTaskRunnerTestUtils.java | 4 +- .../ShutdownNowIgnoringExecutorService.java | 134 ------------------ .../druid/curator/announcement/Announcer.java | 25 +++- .../cache/PathChildrenCacheFactory.java | 90 +++++++++++- .../cache/SimplePathChildrenCacheFactory.java | 93 ------------ .../inventory/CuratorInventoryManager.java | 38 ++--- 8 files changed, 151 insertions(+), 267 deletions(-) delete mode 100644 server/src/main/java/io/druid/curator/ShutdownNowIgnoringExecutorService.java delete mode 100644 server/src/main/java/io/druid/curator/cache/SimplePathChildrenCacheFactory.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 87d7f37e481..fbbfad0a6aa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -37,6 +37,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; +import com.google.common.io.Closer; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -134,7 +135,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private final Duration shutdownTimeout; private final IndexerZkConfig indexerZkConfig; private final CuratorFramework cf; - private final PathChildrenCacheFactory pathChildrenCacheFactory; + private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory; + private final ExecutorService workerStatusPathChildrenCacheExecutor; private final PathChildrenCache workerPathCache; private final HttpClient httpClient; private final Supplier workerConfigRef; @@ -181,7 +183,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer RemoteTaskRunnerConfig config, IndexerZkConfig indexerZkConfig, CuratorFramework cf, - PathChildrenCacheFactory pathChildrenCacheFactory, + PathChildrenCacheFactory.Builder pathChildrenCacheFactory, HttpClient httpClient, Supplier workerConfigRef, ScheduledExecutorService cleanupExec, @@ -193,8 +195,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast this.indexerZkConfig = indexerZkConfig; this.cf = cf; - this.pathChildrenCacheFactory = pathChildrenCacheFactory; - this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); + this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZkConfig.getAnnouncementsPath()); + this.workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor(); + this.workerStatusPathChildrenCacheFactory = pathChildrenCacheFactory + .withExecutorService(workerStatusPathChildrenCacheExecutor) + .withShutdownExecutorOnClose(false) + .build(); this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec); @@ -337,10 +343,17 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer resourceManagement.stopManagement(); + Closer closer = Closer.create(); for (ZkWorker zkWorker : zkWorkers.values()) { - zkWorker.close(); + closer.register(zkWorker); + } + closer.register(workerPathCache); + try { + closer.close(); + } + finally { + workerStatusPathChildrenCacheExecutor.shutdown(); } - workerPathCache.close(); if (runPendingTasksExec != null) { runPendingTasksExec.shutdown(); @@ -889,7 +902,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer cancelWorkerCleanup(worker.getHost()); final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost()); - final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); + final PathChildrenCache statusCache = workerStatusPathChildrenCacheFactory.make(cf, workerStatusPath); final SettableFuture retVal = SettableFuture.create(); final ZkWorker zkWorker = new ZkWorker( worker, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index a054e900dd5..66f8152fe82 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; import com.metamx.http.client.HttpClient; -import io.druid.curator.cache.SimplePathChildrenCacheFactory; +import io.druid.curator.cache.PathChildrenCacheFactory; import io.druid.guice.annotations.Global; import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; @@ -81,10 +81,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory(WorkerBehaviorConfig.defaultConfig())), ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"), diff --git a/server/src/main/java/io/druid/curator/ShutdownNowIgnoringExecutorService.java b/server/src/main/java/io/druid/curator/ShutdownNowIgnoringExecutorService.java deleted file mode 100644 index 5639e429643..00000000000 --- a/server/src/main/java/io/druid/curator/ShutdownNowIgnoringExecutorService.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.curator; - -import com.google.common.collect.ImmutableList; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * This class exists to ignore the shutdownNow() call that PathChildrenCache does on close() so that we can share the - * same executor amongst multiple caches... - */ -public class ShutdownNowIgnoringExecutorService implements ExecutorService -{ - private final ExecutorService exec; - - public ShutdownNowIgnoringExecutorService( - ExecutorService exec - ) - { - this.exec = exec; - } - - @Override - public void shutdown() - { - // Ignore! - } - - @Override - public List shutdownNow() - { - // Ignore! - return ImmutableList.of(); - } - - @Override - public boolean isShutdown() - { - return exec.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return exec.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - return exec.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) - { - return exec.submit(task); - } - - @Override - public Future submit(Runnable task, T result) - { - return exec.submit(task, result); - } - - @Override - public Future submit(Runnable task) - { - return exec.submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException - { - return exec.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, - long timeout, - TimeUnit unit - ) throws InterruptedException - { - return exec.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException - { - return exec.invokeAny(tasks); - } - - @Override - public T invokeAny( - Collection> tasks, - long timeout, - TimeUnit unit - ) throws InterruptedException, ExecutionException, TimeoutException - { - return exec.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) - { - exec.execute(command); - } -} diff --git a/server/src/main/java/io/druid/curator/announcement/Announcer.java b/server/src/main/java/io/druid/curator/announcement/Announcer.java index 21793371e9e..03deeca79d2 100644 --- a/server/src/main/java/io/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/io/druid/curator/announcement/Announcer.java @@ -23,17 +23,14 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; - -import io.druid.curator.ShutdownNowIgnoringExecutorService; +import com.google.common.io.Closer; import io.druid.curator.cache.PathChildrenCacheFactory; -import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; @@ -64,6 +61,7 @@ public class Announcer private final CuratorFramework curator; private final PathChildrenCacheFactory factory; + private final ExecutorService pathChildrenCacheExecutor; private final List toAnnounce = Lists.newArrayList(); private final List toUpdate = Lists.newArrayList(); @@ -79,7 +77,13 @@ public class Announcer ) { this.curator = curator; - this.factory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec)); + this.pathChildrenCacheExecutor = exec; + this.factory = new PathChildrenCacheFactory.Builder() + .withCacheData(false) + .withCompressed(true) + .withExecutorService(exec) + .withShutdownExecutorOnClose(false) + .build(); } @LifecycleStart @@ -114,8 +118,15 @@ public class Announcer started = false; - for (Map.Entry entry : listeners.entrySet()) { - CloseQuietly.close(entry.getValue()); + Closer closer = Closer.create(); + for (PathChildrenCache cache : listeners.values()) { + closer.register(cache); + } + try { + CloseQuietly.close(closer); + } + finally { + pathChildrenCacheExecutor.shutdown(); } for (Map.Entry> entry : announcements.entrySet()) { diff --git a/server/src/main/java/io/druid/curator/cache/PathChildrenCacheFactory.java b/server/src/main/java/io/druid/curator/cache/PathChildrenCacheFactory.java index f62aa6fc0f0..124ad45d942 100644 --- a/server/src/main/java/io/druid/curator/cache/PathChildrenCacheFactory.java +++ b/server/src/main/java/io/druid/curator/cache/PathChildrenCacheFactory.java @@ -21,10 +21,96 @@ package io.druid.curator.cache; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ThreadUtils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; /** */ -public interface PathChildrenCacheFactory +public class PathChildrenCacheFactory { - public PathChildrenCache make(CuratorFramework curator, String path); + private final boolean cacheData; + private final boolean compressed; + private final ExecutorService exec; + private final boolean shutdownExecutorOnClose; + + private PathChildrenCacheFactory( + boolean cacheData, + boolean compressed, + ExecutorService exec, + boolean shutdownExecutorOnClose + ) + { + this.cacheData = cacheData; + this.compressed = compressed; + this.exec = exec; + this.shutdownExecutorOnClose = shutdownExecutorOnClose; + } + + public PathChildrenCache make(CuratorFramework curator, String path) + { + return new PathChildrenCache( + curator, + path, + cacheData, + compressed, + new CloseableExecutorService(exec, shutdownExecutorOnClose) + ); + } + + public static class Builder + { + private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); + + private boolean cacheData; + private boolean compressed; + private ExecutorService exec; + private boolean shutdownExecutorOnClose; + + public Builder() + { + cacheData = true; + compressed = false; + exec = null; + shutdownExecutorOnClose = true; + } + + public Builder withCacheData(boolean cacheData) + { + this.cacheData = cacheData; + return this; + } + + public Builder withCompressed(boolean compressed) + { + this.compressed = compressed; + return this; + } + + public Builder withExecutorService(ExecutorService exec) + { + this.exec = exec; + return this; + } + + public Builder withShutdownExecutorOnClose(boolean shutdownExecutorOnClose) + { + this.shutdownExecutorOnClose = shutdownExecutorOnClose; + return this; + } + + public PathChildrenCacheFactory build() + { + ExecutorService exec = this.exec != null ? this.exec : createDefaultExecutor(); + return new PathChildrenCacheFactory(cacheData, compressed, exec, shutdownExecutorOnClose); + } + + public static ExecutorService createDefaultExecutor() + { + return Executors.newSingleThreadExecutor(defaultThreadFactory); + } + } } diff --git a/server/src/main/java/io/druid/curator/cache/SimplePathChildrenCacheFactory.java b/server/src/main/java/io/druid/curator/cache/SimplePathChildrenCacheFactory.java deleted file mode 100644 index c57468269ce..00000000000 --- a/server/src/main/java/io/druid/curator/cache/SimplePathChildrenCacheFactory.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.curator.cache; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.utils.ThreadUtils; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - -/** - */ -public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory -{ - private final boolean cacheData; - private final boolean compressed; - private final ExecutorService exec; - - public SimplePathChildrenCacheFactory( - boolean cacheData, - boolean compressed, - ExecutorService exec - ) - { - this.cacheData = cacheData; - this.compressed = compressed; - this.exec = exec; - } - - @Override - public PathChildrenCache make(CuratorFramework curator, String path) - { - return new PathChildrenCache(curator, path, cacheData, compressed, exec); - } - - public static class Builder - { - private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); - - private boolean cacheData; - private boolean compressed; - private ExecutorService exec; - - public Builder() - { - cacheData = true; - compressed = false; - exec = Executors.newSingleThreadExecutor(defaultThreadFactory); - } - - public Builder withCacheData(boolean cacheData) - { - this.cacheData = cacheData; - return this; - } - - public Builder withCompressed(boolean compressed) - { - this.compressed = compressed; - return this; - } - - public Builder withExecutorService(ExecutorService exec) - { - this.exec = exec; - return this; - } - - public SimplePathChildrenCacheFactory build() - { - return new SimplePathChildrenCacheFactory(cacheData, compressed, exec); - } - } -} diff --git a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java index e848facbf01..524d4c11a0f 100644 --- a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java @@ -21,17 +21,13 @@ package io.druid.curator.inventory; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; - -import io.druid.curator.ShutdownNowIgnoringExecutorService; +import com.google.common.io.Closer; import io.druid.curator.cache.PathChildrenCacheFactory; -import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -68,6 +64,7 @@ public class CuratorInventoryManager private final ConcurrentMap containers; private final Set uninitializedInventory; private final PathChildrenCacheFactory cacheFactory; + private final ExecutorService pathChildrenCacheExecutor; private volatile PathChildrenCache childrenCache; @@ -85,10 +82,16 @@ public class CuratorInventoryManager this.containers = new MapMaker().makeMap(); this.uninitializedInventory = Sets.newConcurrentHashSet(); - //NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event. - //this is a workaround to solve curator's out-of-order events problem - //https://issues.apache.org/jira/browse/CURATOR-191 - this.cacheFactory = new SimplePathChildrenCacheFactory(false, true, new ShutdownNowIgnoringExecutorService(exec)); + this.pathChildrenCacheExecutor = exec; + this.cacheFactory = new PathChildrenCacheFactory.Builder() + //NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event. + //this is a workaround to solve curator's out-of-order events problem + //https://issues.apache.org/jira/browse/CURATOR-191 + .withCacheData(false) + .withCompressed(true) + .withExecutorService(pathChildrenCacheExecutor) + .withShutdownExecutorOnClose(false) + .build(); } @LifecycleStart @@ -133,14 +136,15 @@ public class CuratorInventoryManager childrenCache = null; } - for (String containerKey : Lists.newArrayList(containers.keySet())) { - final ContainerHolder containerHolder = containers.remove(containerKey); - if (containerHolder == null) { - log.wtf("!? Got key[%s] from keySet() but it didn't have a value!?", containerKey); - } else { - // This close() call actually calls shutdownNow() on the executor registered with the Cache object... - containerHolder.getCache().close(); - } + Closer closer = Closer.create(); + for (ContainerHolder containerHolder : containers.values()) { + closer.register(containerHolder.getCache()); + } + try { + closer.close(); + } + finally { + pathChildrenCacheExecutor.shutdown(); } }