[CORE] Ban all useage of Future#cancel(true)

Interrupting a thread while blocking on an NIO Read / Write Operation
can cause a file to be closed due to the interrupts. This can have unpredictable
effects when files are open by index readers etc. we should prevent interruptions
across the board if possible.

Closes #8494
This commit is contained in:
Simon Willnauer 2014-11-16 19:57:03 +01:00
parent 119aa4af20
commit 5c6fe2593e
18 changed files with 86 additions and 50 deletions

View File

@ -111,3 +111,6 @@ com.ning.compress.lzf.LZFUncompressor#<init>(com.ning.compress.DataHandler, com.
@defaultMessage Spawns a new thread which is solely under lucenes control use ThreadPool#estimatedTimeInMillisCounter instead
org.apache.lucene.search.TimeLimitingCollector#getGlobalTimerThread()
org.apache.lucene.search.TimeLimitingCollector#getGlobalCounter()
@defaultMessage Don't interrupt threads use FutureUtils#cancel(Future<T>) instead
java.util.concurrent.Future#cancel(boolean)

View File

@ -1235,6 +1235,9 @@
<!-- start exclude for Lucene utility class -->
<exclude>org/elasticsearch/common/lucene/Lucene$LenientParser.class</exclude>
<!-- end exclude for Lucene -->
<!-- start exclude for Future utility class -->
<exclude>org/elasticsearch/common/util/concurrent/FutureUtils.class</exclude>
<!-- end exclude for Future utility class -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.io.Closeable;
import java.util.concurrent.*;
@ -221,7 +222,7 @@ public class BulkProcessor implements Closeable {
}
closed = true;
if (this.scheduledFuture != null) {
this.scheduledFuture.cancel(false);
FutureUtils.cancel(this.scheduledFuture);
this.scheduler.shutdown();
}
if (bulkRequest.numberOfActions() > 0) {

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -252,7 +253,7 @@ public class TransportClientNodesService extends AbstractComponent {
return;
}
closed = true;
nodesSamplerFuture.cancel(true);
FutureUtils.cancel(nodesSamplerFuture);
for (DiscoveryNode node : nodes) {
transportService.disconnectFromNode(node);
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;
@ -194,9 +195,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
if (future != null) {
future.cancel(false);
}
FutureUtils.cancel(future);
listener.onResponse(response);
}
}
@ -205,9 +204,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
if (future != null) {
future.cancel(false);
}
FutureUtils.cancel(future);
listener.onFailure(t);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.Future;
@ -83,10 +84,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
protected void doClose() throws ElasticsearchException {
if (scheduledRoutingTableFuture != null) {
scheduledRoutingTableFuture.cancel(true);
scheduledRoutingTableFuture = null;
}
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
clusterService.remove(this);
}
@ -123,10 +122,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}
}
} else {
if (scheduledRoutingTableFuture != null) {
scheduledRoutingTableFuture.cancel(true);
scheduledRoutingTableFuture = null;
}
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
}
}

View File

@ -160,7 +160,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
protected void doStop() throws ElasticsearchException {
this.reconnectToNodes.cancel(true);
FutureUtils.cancel(this.reconnectToNodes);
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.cancel();
onGoingTimeout.listener.onClose();
@ -500,7 +500,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
public void cancel() {
future.cancel(false);
FutureUtils.cancel(future);
}
@Override
@ -708,7 +708,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
ackTimeoutCallback.cancel(true);
FutureUtils.cancel(ackTimeoutCallback);
ackedUpdateTask.onAllNodesAcked(lastFailure);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.metrics;
import jsr166e.LongAdder;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -117,7 +118,5 @@ public class MeterMetric implements Metric {
return ratePerNs * (double) rateUnit.toNanos(1);
}
public void stop() {
future.cancel(false);
}
public void stop() { FutureUtils.cancel(future);}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import java.util.concurrent.Future;
/**
*/
public class FutureUtils {
public static boolean cancel(Future<?> toCancel) {
if (toCancel != null) {
return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads
}
return false;
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
@ -270,7 +271,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
if (newMetaData.hasIndex(danglingIndex)) {
logger.debug("[{}] no longer dangling (created), removing", danglingIndex);
DanglingIndex removed = danglingIndices.remove(danglingIndex);
removed.future.cancel(false);
FutureUtils.cancel(removed.future);
}
}
// delete indices that are no longer part of the metadata

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@ -336,9 +337,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
@Override
public void close() {
if (flushScheduler != null) {
flushScheduler.cancel(false);
}
FutureUtils.cancel(flushScheduler);
}
class Sync implements Runnable {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
@ -697,14 +698,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
synchronized (mutex) {
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
if (refreshScheduledFuture != null) {
refreshScheduledFuture.cancel(true);
refreshScheduledFuture = null;
}
if (mergeScheduleFuture != null) {
mergeScheduleFuture.cancel(true);
mergeScheduleFuture = null;
}
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
FutureUtils.cancel(mergeScheduleFuture);
mergeScheduleFuture = null;
}
changeState(IndexShardState.CLOSED, reason);
}
@ -954,7 +951,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// NOTE: we pass false here so we do NOT attempt Thread.interrupt if EngineRefresher.run is currently running. This is
// very important, because doing so can cause files to suddenly be closed if they were doing IO when the interrupt
// hit. See https://issues.apache.org/jira/browse/LUCENE-2239
refreshScheduledFuture.cancel(false);
FutureUtils.cancel(refreshScheduledFuture);
refreshScheduledFuture = null;
}
InternalIndexShard.this.refreshInterval = refreshInterval;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
@ -94,7 +95,7 @@ public class TranslogService extends AbstractIndexShardComponent {
public void close() {
indexSettingsService.removeListener(applySettings);
this.future.cancel(true);
FutureUtils.cancel(this.future);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
@ -131,10 +132,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
@Override
protected void doStop() throws ElasticsearchException {
if (scheduler != null) {
scheduler.cancel(false);
scheduler = null;
}
FutureUtils.cancel(scheduler);
scheduler = null;
}
@Override

View File

@ -20,13 +20,13 @@
package org.elasticsearch.monitor.jvm;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashSet;
@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.monitor.jvm.DeadlockAnalyzer.deadlockAnalyzer;
import static org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector;
import static org.elasticsearch.monitor.jvm.JvmStats.jvmStats;
@ -124,7 +123,7 @@ public class JvmMonitorService extends AbstractLifecycleComponent<JvmMonitorServ
if (!enabled) {
return;
}
scheduledFuture.cancel(true);
FutureUtils.cancel(scheduledFuture);
}
@Override

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -185,7 +186,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Override
protected void doClose() throws ElasticsearchException {
keepAliveReaper.cancel(false);
FutureUtils.cancel(keepAliveReaper);
}
public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws ElasticsearchException {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
@ -213,8 +214,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// or because we failed serializing the message
final RequestHolder holderToNotify = clientHandlers.remove(requestId);
// if the scheduler raise a EsRejectedExecutionException (due to shutdown), we may have a timeout handler, but no future
if (timeoutHandler != null && timeoutHandler.future != null) {
timeoutHandler.future.cancel(false);
if (timeoutHandler != null) {
FutureUtils.cancel(timeoutHandler.future);
}
// If holderToNotify == null then handler has already been taken care of.
@ -444,7 +445,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public void cancel() {
if (timeout != null) {
timeout.future.cancel(false);
FutureUtils.cancel(timeout.future);
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Set;
@ -104,9 +105,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
if (!enabled) {
return;
}
lowFuture.cancel(true);
mediumFuture.cancel(true);
highFuture.cancel(true);
FutureUtils.cancel(lowFuture);
FutureUtils.cancel(mediumFuture);
FutureUtils.cancel(highFuture);
}
@Override