diff --git a/dev-tools/forbidden/core-signatures.txt b/dev-tools/forbidden/core-signatures.txt index 59a4316e588..151cbb2754b 100644 --- a/dev-tools/forbidden/core-signatures.txt +++ b/dev-tools/forbidden/core-signatures.txt @@ -111,3 +111,6 @@ com.ning.compress.lzf.LZFUncompressor#(com.ning.compress.DataHandler, com. @defaultMessage Spawns a new thread which is solely under lucenes control use ThreadPool#estimatedTimeInMillisCounter instead org.apache.lucene.search.TimeLimitingCollector#getGlobalTimerThread() org.apache.lucene.search.TimeLimitingCollector#getGlobalCounter() + +@defaultMessage Don't interrupt threads use FutureUtils#cancel(Future) instead +java.util.concurrent.Future#cancel(boolean) diff --git a/pom.xml b/pom.xml index 60c80949adc..026d4ebf238 100644 --- a/pom.xml +++ b/pom.xml @@ -1235,6 +1235,9 @@ org/elasticsearch/common/lucene/Lucene$LenientParser.class + + org/elasticsearch/common/util/concurrent/FutureUtils.class + diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index b7cd094dfef..63aecc18d88 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; import java.io.Closeable; import java.util.concurrent.*; @@ -221,7 +222,7 @@ public class BulkProcessor implements Closeable { } closed = true; if (this.scheduledFuture != null) { - this.scheduledFuture.cancel(false); + FutureUtils.cancel(this.scheduledFuture); this.scheduler.shutdown(); } if (bulkRequest.numberOfActions() > 0) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index b6453cf6977..37d3ed137e0 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -252,7 +253,7 @@ public class TransportClientNodesService extends AbstractComponent { return; } closed = true; - nodesSamplerFuture.cancel(true); + FutureUtils.cancel(nodesSamplerFuture); for (DiscoveryNode node : nodes) { transportService.disconnectFromNode(node); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index cdeac11e260..5b66fafec12 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.threadpool.ThreadPool; @@ -194,9 +195,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { public void onResponse(final Response response) { if (notified.compareAndSet(false, true)) { mdLock.release(); - if (future != null) { - future.cancel(false); - } + FutureUtils.cancel(future); listener.onResponse(response); } } @@ -205,9 +204,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { public void onFailure(Throwable t) { if (notified.compareAndSet(false, true)) { mdLock.release(); - if (future != null) { - future.cancel(false); - } + FutureUtils.cancel(future); listener.onFailure(t); } } diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 555b8b3ef1b..b5dec0d0bf4 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.Future; @@ -83,10 +84,8 @@ public class RoutingService extends AbstractLifecycleComponent i @Override protected void doClose() throws ElasticsearchException { - if (scheduledRoutingTableFuture != null) { - scheduledRoutingTableFuture.cancel(true); - scheduledRoutingTableFuture = null; - } + FutureUtils.cancel(scheduledRoutingTableFuture); + scheduledRoutingTableFuture = null; clusterService.remove(this); } @@ -123,10 +122,8 @@ public class RoutingService extends AbstractLifecycleComponent i } } } else { - if (scheduledRoutingTableFuture != null) { - scheduledRoutingTableFuture.cancel(true); - scheduledRoutingTableFuture = null; - } + FutureUtils.cancel(scheduledRoutingTableFuture); + scheduledRoutingTableFuture = null; } } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index a753006d165..79f0437e753 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -160,7 +160,7 @@ public class InternalClusterService extends AbstractLifecycleComponent toCancel) { + if (toCancel != null) { + return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads + } + return false; + } +} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 81789c8daf5..673651482ce 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -270,7 +271,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS if (newMetaData.hasIndex(danglingIndex)) { logger.debug("[{}] no longer dangling (created), removing", danglingIndex); DanglingIndex removed = danglingIndices.remove(danglingIndex); - removed.future.cancel(false); + FutureUtils.cancel(removed.future); } } // delete indices that are no longer part of the metadata diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 89473ee281a..b8e428d472a 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; @@ -336,9 +337,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen @Override public void close() { - if (flushScheduler != null) { - flushScheduler.cancel(false); - } + FutureUtils.cancel(flushScheduler); } class Sync implements Runnable { diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 0cd7c3ff52f..526eddda9e1 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; @@ -697,14 +698,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I synchronized (mutex) { indexSettingsService.removeListener(applyRefreshSettings); if (state != IndexShardState.CLOSED) { - if (refreshScheduledFuture != null) { - refreshScheduledFuture.cancel(true); - refreshScheduledFuture = null; - } - if (mergeScheduleFuture != null) { - mergeScheduleFuture.cancel(true); - mergeScheduleFuture = null; - } + FutureUtils.cancel(refreshScheduledFuture); + refreshScheduledFuture = null; + FutureUtils.cancel(mergeScheduleFuture); + mergeScheduleFuture = null; } changeState(IndexShardState.CLOSED, reason); } @@ -954,7 +951,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I // NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is // very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt // hit. See https://issues.apache.org/jira/browse/LUCENE-2239 - refreshScheduledFuture.cancel(false); + FutureUtils.cancel(refreshScheduledFuture); refreshScheduledFuture = null; } InternalIndexShard.this.refreshInterval = refreshInterval; diff --git a/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/src/main/java/org/elasticsearch/index/translog/TranslogService.java index 78b84a53640..8367ee848a2 100644 --- a/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.settings.IndexSettings; @@ -94,7 +95,7 @@ public class TranslogService extends AbstractIndexShardComponent { public void close() { indexSettingsService.removeListener(applySettings); - this.future.cancel(true); + FutureUtils.cancel(this.future); } diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 47029247a8e..4c6f172b286 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; @@ -131,10 +132,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent { @Override protected void doClose() throws ElasticsearchException { - keepAliveReaper.cancel(false); + FutureUtils.cancel(keepAliveReaper); } public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException { diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index da5542ffd2b..95a93ce4951 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -213,8 +214,8 @@ public class TransportService extends AbstractLifecycleComponent