diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index c1f9f1fc5cc..4c4dbdb4cb9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard.service; import org.apache.lucene.document.Document; +import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.FilterClause; @@ -32,6 +33,7 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.XBooleanFilter; import org.elasticsearch.common.settings.Settings; @@ -57,6 +59,7 @@ import org.elasticsearch.threadpool.ThreadPool; import javax.annotation.Nullable; import java.io.IOException; +import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.ScheduledFuture; @@ -82,6 +85,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final Object mutex = new Object(); + + private final boolean checkIndex; + private volatile IndexShardState state; private ScheduledFuture refreshScheduledFuture; @@ -102,6 +108,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I this.indexCache = indexCache; state = IndexShardState.CREATED; logger.debug("state: [CREATED]"); + + this.checkIndex = indexSettings.getAsBoolean("index.shard.check_index", false); } public Store store() { @@ -181,6 +189,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I throw new IndexShardRelocatedException(shardId); } engine.start(); + if (checkIndex) { + checkIndex(true); + } scheduleRefresherIfNeeded(); logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; @@ -444,6 +455,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (withFlush) { engine.flush(new Engine.Flush()); } + if (checkIndex) { + checkIndex(true); + } synchronized (mutex) { logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; @@ -556,4 +570,27 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } } + + private void checkIndex(boolean throwException) throws IndexShardException { + try { + CheckIndex checkIndex = new CheckIndex(store.directory()); + FastByteArrayOutputStream os = new FastByteArrayOutputStream(); + PrintStream out = new PrintStream(os); + checkIndex.setInfoStream(out); + out.flush(); + CheckIndex.Status status = checkIndex.checkIndex(); + if (!status.clean) { + logger.warn("check index [failure]\n{}", new String(os.unsafeByteArray(), 0, os.size())); + if (throwException) { + throw new IndexShardException(shardId, "index check failure"); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("check index [success]\n{}", new String(os.unsafeByteArray(), 0, os.size())); + } + } + } catch (Exception e) { + logger.warn("failed to check index", e); + } + } } \ No newline at end of file