From b936c2f8454ec67d54310621b15f8473a1da87ac Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 30 Dec 2014 17:16:35 +0100 Subject: [PATCH] [RECOVERY] Allow cancel waiting for mapping changes This commit interrupts the wait for mapping change if the index shard gateway is waiting for the master on a mapping update. --- .../index/gateway/IndexShardGateway.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index d18192531c6..49da62e5112 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGateway; @@ -74,6 +75,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl private final TimeValue syncInterval; private volatile ScheduledFuture flushScheduler; + private final CancellableThreads cancellableThreads = new CancellableThreads(); @Inject @@ -308,15 +310,20 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl logger.debug("failed to send mapping update post recovery to master for [{}]", t, type); } }); - - try { - boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS); - if (!waited) { - logger.debug("waited for mapping update on master for [{}], yet timed out"); + cancellableThreads.execute(new CancellableThreads.Interruptable() { + @Override + public void run() throws InterruptedException { + try { + if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) { + logger.debug("waited for mapping update on master for [{}], yet timed out", type); + } + } catch (InterruptedException e) { + logger.debug("interrupted while waiting for mapping update"); + throw e; + } } - } catch (InterruptedException e) { - logger.debug("interrupted while waiting for mapping update"); - } + }); + } recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime()); } @@ -324,6 +331,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl @Override public void close() { FutureUtils.cancel(flushScheduler); + cancellableThreads.cancel("closed"); } class Sync implements Runnable {