add the ability to run check index before starting a shard, used for internal testing (as it has a large overhead)
This commit is contained in:
parent
e63847f771
commit
bbb05a57e0
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.index.shard.service;
|
package org.elasticsearch.index.shard.service;
|
||||||
|
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
|
import org.apache.lucene.index.CheckIndex;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.BooleanClause;
|
import org.apache.lucene.search.BooleanClause;
|
||||||
import org.apache.lucene.search.FilterClause;
|
import org.apache.lucene.search.FilterClause;
|
||||||
|
@ -32,6 +33,7 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.io.FastByteArrayOutputStream;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.search.XBooleanFilter;
|
import org.elasticsearch.common.lucene.search.XBooleanFilter;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -57,6 +59,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
|
||||||
|
@ -82,6 +85,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
|
|
||||||
private final Object mutex = new Object();
|
private final Object mutex = new Object();
|
||||||
|
|
||||||
|
|
||||||
|
private final boolean checkIndex;
|
||||||
|
|
||||||
private volatile IndexShardState state;
|
private volatile IndexShardState state;
|
||||||
|
|
||||||
private ScheduledFuture refreshScheduledFuture;
|
private ScheduledFuture refreshScheduledFuture;
|
||||||
|
@ -102,6 +108,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
this.indexCache = indexCache;
|
this.indexCache = indexCache;
|
||||||
state = IndexShardState.CREATED;
|
state = IndexShardState.CREATED;
|
||||||
logger.debug("state: [CREATED]");
|
logger.debug("state: [CREATED]");
|
||||||
|
|
||||||
|
this.checkIndex = indexSettings.getAsBoolean("index.shard.check_index", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Store store() {
|
public Store store() {
|
||||||
|
@ -181,6 +189,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
throw new IndexShardRelocatedException(shardId);
|
throw new IndexShardRelocatedException(shardId);
|
||||||
}
|
}
|
||||||
engine.start();
|
engine.start();
|
||||||
|
if (checkIndex) {
|
||||||
|
checkIndex(true);
|
||||||
|
}
|
||||||
scheduleRefresherIfNeeded();
|
scheduleRefresherIfNeeded();
|
||||||
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
|
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
|
||||||
state = IndexShardState.STARTED;
|
state = IndexShardState.STARTED;
|
||||||
|
@ -444,6 +455,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
if (withFlush) {
|
if (withFlush) {
|
||||||
engine.flush(new Engine.Flush());
|
engine.flush(new Engine.Flush());
|
||||||
}
|
}
|
||||||
|
if (checkIndex) {
|
||||||
|
checkIndex(true);
|
||||||
|
}
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
|
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
|
||||||
state = IndexShardState.STARTED;
|
state = IndexShardState.STARTED;
|
||||||
|
@ -556,4 +570,27 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void checkIndex(boolean throwException) throws IndexShardException {
|
||||||
|
try {
|
||||||
|
CheckIndex checkIndex = new CheckIndex(store.directory());
|
||||||
|
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
|
||||||
|
PrintStream out = new PrintStream(os);
|
||||||
|
checkIndex.setInfoStream(out);
|
||||||
|
out.flush();
|
||||||
|
CheckIndex.Status status = checkIndex.checkIndex();
|
||||||
|
if (!status.clean) {
|
||||||
|
logger.warn("check index [failure]\n{}", new String(os.unsafeByteArray(), 0, os.size()));
|
||||||
|
if (throwException) {
|
||||||
|
throw new IndexShardException(shardId, "index check failure");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("check index [success]\n{}", new String(os.unsafeByteArray(), 0, os.size()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed to check index", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue