[RECOVERY] Allow cancel waiting for mapping changes

This commit interrupts the wait for mapping change if the index
shard gateway is waiting for the master on a mapping update.
This commit is contained in:
Simon Willnauer 2014-12-30 17:16:35 +01:00
parent 54ce210c8e
commit b936c2f845
1 changed files with 16 additions and 8 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGateway;
@ -74,6 +75,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
private final TimeValue syncInterval;
private volatile ScheduledFuture flushScheduler;
private final CancellableThreads cancellableThreads = new CancellableThreads();
@Inject
@ -308,15 +310,20 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
logger.debug("failed to send mapping update post recovery to master for [{}]", t, type);
}
});
try {
boolean waited = latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS);
if (!waited) {
logger.debug("waited for mapping update on master for [{}], yet timed out");
cancellableThreads.execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
try {
if (latch.await(waitForMappingUpdatePostRecovery.millis(), TimeUnit.MILLISECONDS) == false) {
logger.debug("waited for mapping update on master for [{}], yet timed out", type);
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
throw e;
}
}
} catch (InterruptedException e) {
logger.debug("interrupted while waiting for mapping update");
}
});
}
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
}
@ -324,6 +331,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
@Override
public void close() {
FutureUtils.cancel(flushScheduler);
cancellableThreads.cancel("closed");
}
class Sync implements Runnable {