core: add 'checksum' option for index.shard.check_on_startup
The current "checkindex" on startup is very very expensive. This is like running one of the old school hard drive diagnostic checkers and usually not a good idea. But we can do a CRC32 verification of files. We don't even need to open an indexreader to do this, its much more lightweight. This option (as well as the existing true/false) are randomized in tests to find problems. Also fix bug where use of the current option would always leak an indexwriter lock. Closes #9183
This commit is contained in:
parent
6079d88d43
commit
027730006b
|
@ -47,17 +47,20 @@ otherwise it is written in non-compound format.
|
|||
at the expense of slower stored fields performance.
|
||||
|
||||
`index.shard.check_on_startup`::
|
||||
Should shard consistency be checked upon opening.
|
||||
When `true`, the shard will be checked, preventing it from being open in
|
||||
case some segments appear to be corrupted.
|
||||
When `fix`, the shard will also be checked but segments that were reported
|
||||
as corrupted will be automatically removed.
|
||||
Default value is `false`, which doesn't check shards.
|
||||
Should shard consistency be checked upon opening. When corruption is detected,
|
||||
it will prevent the shard from being opened.
|
||||
|
||||
When `checksum`, check for physical corruption.
|
||||
When `true`, check for both physical and logical corruption. This is much
|
||||
more expensive in terms of CPU and memory usage.
|
||||
When `fix`, check for both physical and logical corruption, and segments
|
||||
that were reported as corrupted will be automatically removed.
|
||||
Default value is `false`, which performs no checks.
|
||||
|
||||
NOTE: Checking shards may take a lot of time on large indices.
|
||||
|
||||
WARNING: Setting `index.shard.check_on_startup` to `fix` may result in data loss,
|
||||
use with caution.
|
||||
use with extreme caution.
|
||||
|
||||
--
|
||||
|
||||
|
|
|
@ -88,7 +88,9 @@ import org.elasticsearch.index.settings.IndexSettings;
|
|||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
import org.elasticsearch.index.store.Store.MetadataSnapshot;
|
||||
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
|
||||
import org.elasticsearch.index.suggest.stats.SuggestStats;
|
||||
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
|
||||
|
@ -107,6 +109,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -689,7 +692,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
throw new IndexShardRelocatedException(shardId);
|
||||
}
|
||||
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
|
||||
checkIndex(true);
|
||||
checkIndex();
|
||||
}
|
||||
createNewEngine();
|
||||
startScheduledTasksIfNeeded();
|
||||
|
@ -708,7 +711,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
// also check here, before we apply the translog
|
||||
if (Booleans.parseBoolean(checkIndexOnStartup, false)) {
|
||||
checkIndex(true);
|
||||
checkIndex();
|
||||
}
|
||||
// we disable deletes since we allow for operations to be executed against the shard while recovering
|
||||
// but we need to make sure we don't loose deletes until we are done recovering
|
||||
|
@ -994,49 +997,77 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIndex(boolean throwException) throws IndexShardException {
|
||||
|
||||
private void checkIndex() throws IndexShardException {
|
||||
try {
|
||||
checkIndexTook = 0;
|
||||
long time = System.currentTimeMillis();
|
||||
if (!Lucene.indexExists(store.directory())) {
|
||||
return;
|
||||
}
|
||||
CheckIndex checkIndex = new CheckIndex(store.directory());
|
||||
BytesStreamOutput os = new BytesStreamOutput();
|
||||
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
|
||||
checkIndex.setInfoStream(out);
|
||||
out.flush();
|
||||
CheckIndex.Status status = checkIndex.checkIndex();
|
||||
if (!status.clean) {
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
// ignore if closed....
|
||||
return;
|
||||
doCheckIndex();
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardException(shardId, "exception during checkindex", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void doCheckIndex() throws IndexShardException, IOException {
|
||||
checkIndexTook = 0;
|
||||
long time = System.currentTimeMillis();
|
||||
if (!Lucene.indexExists(store.directory())) {
|
||||
return;
|
||||
}
|
||||
BytesStreamOutput os = new BytesStreamOutput();
|
||||
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
|
||||
|
||||
if ("checksum".equalsIgnoreCase(checkIndexOnStartup)) {
|
||||
// physical verification only: verify all checksums for the latest commit
|
||||
boolean corrupt = false;
|
||||
MetadataSnapshot metadata = store.getMetadata();
|
||||
for (Map.Entry<String,StoreFileMetaData> entry : metadata.asMap().entrySet()) {
|
||||
try {
|
||||
Store.checkIntegrity(entry.getValue(), store.directory());
|
||||
out.println("checksum passed: " + entry.getKey());
|
||||
} catch (IOException exc) {
|
||||
out.println("checksum failed: " + entry.getKey());
|
||||
exc.printStackTrace(out);
|
||||
corrupt = true;
|
||||
}
|
||||
}
|
||||
out.flush();
|
||||
if (corrupt) {
|
||||
logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
|
||||
if ("fix".equalsIgnoreCase(checkIndexOnStartup)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("fixing index, writing new segments file ...");
|
||||
throw new IndexShardException(shardId, "index check failure");
|
||||
}
|
||||
} else {
|
||||
// full checkindex
|
||||
try (CheckIndex checkIndex = new CheckIndex(store.directory())) {
|
||||
checkIndex.setInfoStream(out);
|
||||
CheckIndex.Status status = checkIndex.checkIndex();
|
||||
out.flush();
|
||||
|
||||
if (!status.clean) {
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
// ignore if closed....
|
||||
return;
|
||||
}
|
||||
checkIndex.exorciseIndex(status);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
|
||||
}
|
||||
} else {
|
||||
// only throw a failure if we are not going to fix the index
|
||||
if (throwException) {
|
||||
logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
|
||||
if ("fix".equalsIgnoreCase(checkIndexOnStartup)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("fixing index, writing new segments file ...");
|
||||
}
|
||||
checkIndex.exorciseIndex(status);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
|
||||
}
|
||||
} else {
|
||||
// only throw a failure if we are not going to fix the index
|
||||
throw new IndexShardException(shardId, "index check failure");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("check index [success]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
|
||||
}
|
||||
}
|
||||
checkIndexTook = System.currentTimeMillis() - time;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to check index", e);
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("check index [success]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
|
||||
}
|
||||
|
||||
checkIndexTook = System.currentTimeMillis() - time;
|
||||
}
|
||||
|
||||
public Engine engine() {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.store;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.*;
|
||||
|
@ -443,17 +444,32 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
}
|
||||
|
||||
public boolean checkIntegrity(StoreFileMetaData md) {
|
||||
return checkIntegrity(md, directory());
|
||||
public boolean checkIntegrityNoException(StoreFileMetaData md) {
|
||||
return checkIntegrityNoException(md, directory());
|
||||
}
|
||||
|
||||
public static boolean checkIntegrityNoException(StoreFileMetaData md, Directory directory) {
|
||||
try {
|
||||
checkIntegrity(md, directory);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean checkIntegrity(final StoreFileMetaData md, final Directory directory) {
|
||||
public static void checkIntegrity(final StoreFileMetaData md, final Directory directory) throws IOException {
|
||||
try (IndexInput input = directory.openInput(md.name(), IOContext.READONCE)) {
|
||||
if (input.length() != md.length()) { // first check the length no matter how old this file is
|
||||
return false;
|
||||
throw new CorruptIndexException("expected length=" + md.length() + " != actual length: " + input.length() + " : file truncated?", input);
|
||||
}
|
||||
if (md.writtenBy() != null && md.writtenBy().onOrAfter(Version.LUCENE_4_8_0)) {
|
||||
return Store.digestToString(CodecUtil.checksumEntireFile(input)).equals(md.checksum());
|
||||
// throw exception if the file is corrupt
|
||||
String checksum = Store.digestToString(CodecUtil.checksumEntireFile(input));
|
||||
// throw exception if metadata is inconsistent
|
||||
if (!checksum.equals(md.checksum())) {
|
||||
throw new CorruptIndexException("inconsistent metadata: lucene checksum=" + checksum +
|
||||
", metadata checksum=" + md.checksum(), input);
|
||||
}
|
||||
} else if (md.hasLegacyChecksum()) {
|
||||
// legacy checksum verification - no footer that we need to omit in the checksum!
|
||||
final Checksum checksum = new Adler32();
|
||||
|
@ -467,12 +483,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
checksum.update(buffer, 0, bytesToRead);
|
||||
read += bytesToRead;
|
||||
}
|
||||
return Store.digestToString(checksum.getValue()).equals(md.checksum());
|
||||
String adler32 = Store.digestToString(checksum.getValue());
|
||||
if (!adler32.equals(md.checksum())) {
|
||||
throw new CorruptIndexException("checksum failed (hardware problem?) : expected=" + md.checksum() +
|
||||
" actual=" + adler32, input);
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isMarkedCorrupted() throws IOException {
|
||||
|
|
|
@ -293,7 +293,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
|||
} catch (Throwable e) {
|
||||
final Throwable corruptIndexException;
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
||||
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
|
||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
|
||||
// if we are not the first exception, add ourselves as suppressed to the main one:
|
||||
|
|
|
@ -564,8 +564,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum));
|
||||
assertTrue(legacy.hasLegacyChecksum());
|
||||
assertFalse(lucene.hasLegacyChecksum());
|
||||
assertTrue(Store.checkIntegrity(lucene, dir));
|
||||
assertTrue(Store.checkIntegrity(legacy, dir));
|
||||
assertTrue(Store.checkIntegrityNoException(lucene, dir));
|
||||
assertTrue(Store.checkIntegrityNoException(legacy, dir));
|
||||
}
|
||||
|
||||
{ // negative check - wrong checksum
|
||||
|
@ -573,8 +573,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum+1));
|
||||
assertTrue(legacy.hasLegacyChecksum());
|
||||
assertFalse(lucene.hasLegacyChecksum());
|
||||
assertFalse(Store.checkIntegrity(lucene, dir));
|
||||
assertFalse(Store.checkIntegrity(legacy, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(lucene, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(legacy, dir));
|
||||
}
|
||||
|
||||
{ // negative check - wrong length
|
||||
|
@ -582,8 +582,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength+1, Store.digestToString(adler32LegacyChecksum));
|
||||
assertTrue(legacy.hasLegacyChecksum());
|
||||
assertFalse(lucene.hasLegacyChecksum());
|
||||
assertFalse(Store.checkIntegrity(lucene, dir));
|
||||
assertFalse(Store.checkIntegrity(legacy, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(lucene, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(legacy, dir));
|
||||
}
|
||||
|
||||
{ // negative check - wrong file
|
||||
|
@ -591,8 +591,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
StoreFileMetaData legacy = new StoreFileMetaData("lucene_checksum.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum));
|
||||
assertTrue(legacy.hasLegacyChecksum());
|
||||
assertFalse(lucene.hasLegacyChecksum());
|
||||
assertFalse(Store.checkIntegrity(lucene, dir));
|
||||
assertFalse(Store.checkIntegrity(legacy, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(lucene, dir));
|
||||
assertFalse(Store.checkIntegrityNoException(legacy, dir));
|
||||
}
|
||||
dir.close();
|
||||
|
||||
|
|
|
@ -468,6 +468,10 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
if (random.nextBoolean()) {
|
||||
builder.put(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, random.nextBoolean());
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put("index.shard.check_on_startup", randomFrom(random, "false", "checksum", "true"));
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndicesQueryCache.INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, randomIntBetween(1, 32));
|
||||
|
|
Loading…
Reference in New Issue