From 5c6fe2593e491aa133d3f2d038a6fcfd32fb66f7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sun, 16 Nov 2014 19:57:03 +0100 Subject: [PATCH] [CORE] Ban all useage of Future#cancel(true) Interrupting a thread while blocking on an NIO Read / Write Operation can cause a file to be closed due to the interrupts. This can have unpredictable effects when files are open by index readers etc. we should prevent interruptions across the board if possible. Closes #8494 --- dev-tools/forbidden/core-signatures.txt | 3 ++ pom.xml | 3 ++ .../action/bulk/BulkProcessor.java | 3 +- .../TransportClientNodesService.java | 3 +- .../metadata/MetaDataDeleteIndexService.java | 9 ++--- .../cluster/routing/RoutingService.java | 13 +++---- .../service/InternalClusterService.java | 6 ++-- .../common/metrics/MeterMetric.java | 5 ++- .../common/util/concurrent/FutureUtils.java | 36 +++++++++++++++++++ .../state/meta/LocalGatewayMetaState.java | 3 +- .../gateway/local/LocalIndexShardGateway.java | 5 ++- .../shard/service/InternalIndexShard.java | 15 ++++---- .../index/translog/TranslogService.java | 3 +- .../memory/IndexingMemoryController.java | 7 ++-- .../monitor/jvm/JvmMonitorService.java | 5 ++- .../elasticsearch/search/SearchService.java | 3 +- .../transport/TransportService.java | 7 ++-- .../watcher/ResourceWatcherService.java | 7 ++-- 18 files changed, 86 insertions(+), 50 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java diff --git a/dev-tools/forbidden/core-signatures.txt b/dev-tools/forbidden/core-signatures.txt index 59a4316e588..151cbb2754b 100644 --- a/dev-tools/forbidden/core-signatures.txt +++ b/dev-tools/forbidden/core-signatures.txt @@ -111,3 +111,6 @@ com.ning.compress.lzf.LZFUncompressor#(com.ning.compress.DataHandler, com. @defaultMessage Spawns a new thread which is solely under lucenes control use ThreadPool#estimatedTimeInMillisCounter instead org.apache.lucene.search.TimeLimitingCollector#getGlobalTimerThread() org.apache.lucene.search.TimeLimitingCollector#getGlobalCounter() + +@defaultMessage Don't interrupt threads use FutureUtils#cancel(Future) instead +java.util.concurrent.Future#cancel(boolean) diff --git a/pom.xml b/pom.xml index 60c80949adc..026d4ebf238 100644 --- a/pom.xml +++ b/pom.xml @@ -1235,6 +1235,9 @@ org/elasticsearch/common/lucene/Lucene$LenientParser.class + + org/elasticsearch/common/util/concurrent/FutureUtils.class + diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index b7cd094dfef..63aecc18d88 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; import java.io.Closeable; import java.util.concurrent.*; @@ -221,7 +222,7 @@ public class BulkProcessor implements Closeable { } closed = true; if (this.scheduledFuture != null) { - this.scheduledFuture.cancel(false); + FutureUtils.cancel(this.scheduledFuture); this.scheduler.shutdown(); } if (bulkRequest.numberOfActions() > 0) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index b6453cf6977..37d3ed137e0 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -252,7 +253,7 @@ public class TransportClientNodesService extends AbstractComponent { return; } closed = true; - nodesSamplerFuture.cancel(true); + FutureUtils.cancel(nodesSamplerFuture); for (DiscoveryNode node : nodes) { transportService.disconnectFromNode(node); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index cdeac11e260..5b66fafec12 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.threadpool.ThreadPool; @@ -194,9 +195,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { public void onResponse(final Response response) { if (notified.compareAndSet(false, true)) { mdLock.release(); - if (future != null) { - future.cancel(false); - } + FutureUtils.cancel(future); listener.onResponse(response); } } @@ -205,9 +204,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { public void onFailure(Throwable t) { if (notified.compareAndSet(false, true)) { mdLock.release(); - if (future != null) { - future.cancel(false); - } + FutureUtils.cancel(future); listener.onFailure(t); } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 555b8b3ef1b..b5dec0d0bf4 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.Future; @@ -83,10 +84,8 @@ public class RoutingService extends AbstractLifecycleComponent i @Override protected void doClose() throws ElasticsearchException { - if (scheduledRoutingTableFuture != null) { - scheduledRoutingTableFuture.cancel(true); - scheduledRoutingTableFuture = null; - } + FutureUtils.cancel(scheduledRoutingTableFuture); + scheduledRoutingTableFuture = null; clusterService.remove(this); } @@ -123,10 +122,8 @@ public class RoutingService extends AbstractLifecycleComponent i } } } else { - if (scheduledRoutingTableFuture != null) { - scheduledRoutingTableFuture.cancel(true); - scheduledRoutingTableFuture = null; - } + FutureUtils.cancel(scheduledRoutingTableFuture); + scheduledRoutingTableFuture = null; } } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index a753006d165..79f0437e753 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -160,7 +160,7 @@ public class InternalClusterService extends AbstractLifecycleComponent toCancel) { + if (toCancel != null) { + return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads + } + return false; + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 81789c8daf5..673651482ce 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -270,7 +271,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS if (newMetaData.hasIndex(danglingIndex)) { logger.debug("[{}] no longer dangling (created), removing", danglingIndex); DanglingIndex removed = danglingIndices.remove(danglingIndex); - removed.future.cancel(false); + FutureUtils.cancel(removed.future); } } // delete indices that are no longer part of the metadata diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 89473ee281a..b8e428d472a 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; @@ -336,9 +337,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen @Override public void close() { - if (flushScheduler != null) { - flushScheduler.cancel(false); - } + FutureUtils.cancel(flushScheduler); } class Sync implements Runnable { diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 0cd7c3ff52f..526eddda9e1 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; @@ -697,14 +698,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I synchronized (mutex) { indexSettingsService.removeListener(applyRefreshSettings); if (state != IndexShardState.CLOSED) { - if (refreshScheduledFuture != null) { - refreshScheduledFuture.cancel(true); - refreshScheduledFuture = null; - } - if (mergeScheduleFuture != null) { - mergeScheduleFuture.cancel(true); - mergeScheduleFuture = null; - } + FutureUtils.cancel(refreshScheduledFuture); + refreshScheduledFuture = null; + FutureUtils.cancel(mergeScheduleFuture); + mergeScheduleFuture = null; } changeState(IndexShardState.CLOSED, reason); } @@ -954,7 +951,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - refreshScheduledFuture.cancel(false); + FutureUtils.cancel(refreshScheduledFuture); refreshScheduledFuture = null; } InternalIndexShard.this.refreshInterval = refreshInterval; diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/src/main/java/org/elasticsearch/index/translog/TranslogService.java index 78b84a53640..8367ee848a2 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.settings.IndexSettings; @@ -94,7 +95,7 @@ public class TranslogService extends AbstractIndexShardComponent { public void close() { indexSettingsService.removeListener(applySettings); - this.future.cancel(true); + FutureUtils.cancel(this.future); } diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 47029247a8e..4c6f172b286 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; @@ -131,10 +132,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent { @Override protected void doClose() throws ElasticsearchException { - keepAliveReaper.cancel(false); + FutureUtils.cancel(keepAliveReaper); } public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException { diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index da5542ffd2b..95a93ce4951 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -213,8 +214,8 @@ public class TransportService extends AbstractLifecycleComponent