diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index d18192531c6..49da62e5112 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGateway; @@ -74,6 +75,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl private final TimeValue syncInterval; private volatile ScheduledFuture flushScheduler; + private final CancellableThreads cancellableThreads = new CancellableThreads(); @Inject @@ -308,15 +310,20 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl logger.debug("failed to send mapping update post recovery to master for [{}]", t, type); } }); - - try { - boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS); - if (!waited) { - logger.debug("waited for mapping update on master for [{}], yet timed out"); + cancellableThreads.execute(new CancellableThreads.Interruptable() { + @Override + public void run() throws InterruptedException { + try { + if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("waited for mapping update on master for [{}], yet timed out", type); + } + } catch (InterruptedException e) { + logger.debug("interrupted while waiting for mapping update"); + throw e; + } } - } catch (InterruptedException e) { - logger.debug("interrupted while waiting for mapping update"); - } + }); + } recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime()); } @@ -324,6 +331,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl @Override public void close() { FutureUtils.cancel(flushScheduler); + cancellableThreads.cancel("closed"); } class Sync implements Runnable {