drop `index.shard.check_on_startup: fix` (#32279)
drop `index.shard.check_on_startup: fix` Relates #31389
This commit is contained in:
parent
d4f2b5be7d
commit
3d82a30fad
|
@ -65,9 +65,7 @@ corruption is detected, it will prevent the shard from being opened. Accepts:
|
|||
|
||||
`fix`::
|
||||
|
||||
Check for both physical and logical corruption. Segments that were reported
|
||||
as corrupted will be automatically removed. This option *may result in data loss*.
|
||||
Use with extreme caution!
|
||||
The same as `false`. This option is deprecated and will be completely removed in 7.0.
|
||||
|
||||
WARNING: Expert only. Checking shards may take a lot of time on large indices.
|
||||
--
|
||||
|
|
|
@ -301,6 +301,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
logger.debug("state: [CREATED]");
|
||||
|
||||
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
|
||||
if ("fix".equals(checkIndexOnStartup)) {
|
||||
deprecationLogger.deprecated("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
|
||||
+ "which has no effect and will not be accepted in future");
|
||||
}
|
||||
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
|
||||
final String aId = shardRouting.allocationId().getId();
|
||||
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
|
||||
|
@ -1325,7 +1329,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
|
||||
// also check here, before we apply the translog
|
||||
if (Booleans.isTrue(checkIndexOnStartup)) {
|
||||
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
|
||||
try {
|
||||
checkIndex();
|
||||
} catch (IOException ex) {
|
||||
|
@ -1933,6 +1937,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
if (store.tryIncRef()) {
|
||||
try {
|
||||
doCheckIndex();
|
||||
} catch (IOException e) {
|
||||
store.markStoreCorrupted(e);
|
||||
throw e;
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
|
@ -1976,18 +1983,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return;
|
||||
}
|
||||
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
|
||||
if ("fix".equals(checkIndexOnStartup)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("fixing index, writing new segments file ...");
|
||||
}
|
||||
store.exorciseIndex(status);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
|
||||
}
|
||||
} else {
|
||||
// only throw a failure if we are not going to fix the index
|
||||
throw new IllegalStateException("index check failure but can't fix it");
|
||||
}
|
||||
throw new IOException("index check failure");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
|
||||
static final int VERSION_START = 0;
|
||||
static final int VERSION = VERSION_WRITE_THROWABLE;
|
||||
static final String CORRUPTED = "corrupted_";
|
||||
// public is for test purposes
|
||||
public static final String CORRUPTED = "corrupted_";
|
||||
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
|
||||
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);
|
||||
|
||||
|
@ -360,18 +361,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
|
||||
*/
|
||||
public void exorciseIndex(CheckIndex.Status status) throws IOException {
|
||||
metadataLock.writeLock().lock();
|
||||
try (CheckIndex checkIndex = new CheckIndex(directory)) {
|
||||
checkIndex.exorciseIndex(status);
|
||||
} finally {
|
||||
metadataLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public StoreStats stats() throws IOException {
|
||||
ensureOpen();
|
||||
return new StoreStats(directory.estimateSize());
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException;
|
|||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
|
@ -118,6 +119,7 @@ import org.elasticsearch.snapshots.Snapshot;
|
|||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.test.CorruptionUtils;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.FieldMaskingReader;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -126,7 +128,11 @@ import org.elasticsearch.ElasticsearchException;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -1239,7 +1245,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
};
|
||||
|
||||
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
|
||||
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
|
||||
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store,
|
||||
null, new InternalEngineFactory(), () -> {
|
||||
}, EMPTY_EVENT_LISTENER);
|
||||
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
|
||||
|
@ -2590,6 +2596,143 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
public void testIndexCheckOnStartup() throws Exception {
|
||||
final IndexShard indexShard = newStartedShard(true);
|
||||
|
||||
final long numDocs = between(10, 100);
|
||||
for (long i = 0; i < numDocs; i++) {
|
||||
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
|
||||
}
|
||||
indexShard.flush(new FlushRequest());
|
||||
closeShards(indexShard);
|
||||
|
||||
final ShardPath shardPath = indexShard.shardPath();
|
||||
|
||||
final Path indexPath = corruptIndexFile(shardPath);
|
||||
|
||||
final AtomicInteger corruptedMarkerCount = new AtomicInteger();
|
||||
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
|
||||
corruptedMarkerCount.incrementAndGet();
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
};
|
||||
Files.walkFileTree(indexPath, corruptedVisitor);
|
||||
|
||||
assertThat("corruption marker should not be there", corruptedMarkerCount.get(), equalTo(0));
|
||||
|
||||
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
|
||||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
|
||||
);
|
||||
// start shard and perform index check on startup. It enforce shard to fail due to corrupted index files
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
|
||||
.settings(Settings.builder()
|
||||
.put(indexShard.indexSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum")))
|
||||
.build();
|
||||
|
||||
IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
|
||||
null, null, indexShard.engineFactory,
|
||||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
|
||||
final IndexShardRecoveryException indexShardRecoveryException =
|
||||
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
|
||||
assertThat(indexShardRecoveryException.getMessage(), equalTo("failed recovery"));
|
||||
|
||||
// check that corrupt marker is there
|
||||
Files.walkFileTree(indexPath, corruptedVisitor);
|
||||
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));
|
||||
|
||||
try {
|
||||
closeShards(corruptedShard);
|
||||
} catch (RuntimeException e) {
|
||||
assertThat(e.getMessage(), equalTo("CheckIndex failed"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
|
||||
final IndexShard indexShard = newStartedShard(true);
|
||||
|
||||
final long numDocs = between(10, 100);
|
||||
for (long i = 0; i < numDocs; i++) {
|
||||
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
|
||||
}
|
||||
indexShard.flush(new FlushRequest());
|
||||
closeShards(indexShard);
|
||||
|
||||
final ShardPath shardPath = indexShard.shardPath();
|
||||
|
||||
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
|
||||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
|
||||
);
|
||||
final IndexMetaData indexMetaData = indexShard.indexSettings().getIndexMetaData();
|
||||
|
||||
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
|
||||
|
||||
// create corrupted marker
|
||||
final String corruptionMessage = "fake ioexception";
|
||||
try(Store store = createStore(indexShard.indexSettings(), shardPath)) {
|
||||
store.markStoreCorrupted(new IOException(corruptionMessage));
|
||||
}
|
||||
|
||||
// try to start shard on corrupted files
|
||||
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
|
||||
null, null, indexShard.engineFactory,
|
||||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
|
||||
final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class,
|
||||
() -> newStartedShard(p -> corruptedShard, true));
|
||||
assertThat(exception1.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
|
||||
closeShards(corruptedShard);
|
||||
|
||||
final AtomicInteger corruptedMarkerCount = new AtomicInteger();
|
||||
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
|
||||
corruptedMarkerCount.incrementAndGet();
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
};
|
||||
Files.walkFileTree(indexPath, corruptedVisitor);
|
||||
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));
|
||||
|
||||
// try to start another time shard on corrupted files
|
||||
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
|
||||
null, null, indexShard.engineFactory,
|
||||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
|
||||
final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class,
|
||||
() -> newStartedShard(p -> corruptedShard2, true));
|
||||
assertThat(exception2.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
|
||||
closeShards(corruptedShard2);
|
||||
|
||||
// check that corrupt marker is there
|
||||
corruptedMarkerCount.set(0);
|
||||
Files.walkFileTree(indexPath, corruptedVisitor);
|
||||
assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1));
|
||||
}
|
||||
|
||||
private Path corruptIndexFile(ShardPath shardPath) throws IOException {
|
||||
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
|
||||
final Path[] filesToCorrupt =
|
||||
Files.walk(indexPath)
|
||||
.filter(p -> {
|
||||
final String name = p.getFileName().toString();
|
||||
return Files.isRegularFile(p)
|
||||
&& name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS
|
||||
&& IndexWriter.WRITE_LOCK_NAME.equals(name) == false
|
||||
&& name.startsWith("segments_") == false && name.endsWith(".si") == false;
|
||||
})
|
||||
.toArray(Path[]::new);
|
||||
CorruptionUtils.corruptFile(random(), filesToCorrupt);
|
||||
return indexPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
|
||||
* and checking index concurrently. This should always be possible without any exception.
|
||||
|
@ -2613,7 +2756,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
|
||||
.settings(Settings.builder()
|
||||
.put(indexShard.indexSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
|
||||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
|
||||
.build();
|
||||
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
|
||||
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
|
@ -2655,6 +2798,16 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(newShard);
|
||||
}
|
||||
|
||||
public void testCheckOnStartupDeprecatedValue() throws Exception {
|
||||
final Settings settings = Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "fix").build();
|
||||
|
||||
final IndexShard newShard = newShard(true, settings);
|
||||
closeShards(newShard);
|
||||
|
||||
assertWarnings("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
|
||||
+ "which has no effect and will not be accepted in future");
|
||||
}
|
||||
|
||||
class Result {
|
||||
private final int localCheckpoint;
|
||||
private final int maxSeqNo;
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
|
@ -156,7 +157,6 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
|
||||
protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
|
||||
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
|
||||
}
|
||||
|
@ -169,7 +169,6 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
}
|
||||
};
|
||||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -179,7 +178,17 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
* another shard)
|
||||
*/
|
||||
protected IndexShard newShard(boolean primary) throws IOException {
|
||||
return newShard(primary, Settings.EMPTY, new InternalEngineFactory());
|
||||
return newShard(primary, Settings.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new initializing shard. The shard will have its own unique data path.
|
||||
*
|
||||
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
|
||||
* another shard)
|
||||
*/
|
||||
protected IndexShard newShard(final boolean primary, final Settings settings) throws IOException {
|
||||
return newShard(primary, settings, new InternalEngineFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -318,23 +327,25 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
* @param routing shard routing to use
|
||||
* @param shardPath path to use for shard data
|
||||
* @param indexMetaData indexMetaData for the shard, including any mapping
|
||||
* @param store an optional custom store to use. If null a default file based store will be created
|
||||
* @param storeProvider an optional custom store provider to use. If null a default file based store will be created
|
||||
* @param indexSearcherWrapper an optional wrapper to be used during searchers
|
||||
* @param globalCheckpointSyncer callback for syncing global checkpoints
|
||||
* @param indexEventListener index event listener
|
||||
* @param listeners an optional set of listeners to add to the shard
|
||||
*/
|
||||
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
|
||||
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
|
||||
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
|
||||
@Nullable IndexSearcherWrapper indexSearcherWrapper,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
Runnable globalCheckpointSyncer,
|
||||
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
|
||||
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
|
||||
final IndexShard indexShard;
|
||||
if (store == null) {
|
||||
store = createStore(indexSettings, shardPath);
|
||||
if (storeProvider == null) {
|
||||
storeProvider = is -> createStore(is, shardPath);
|
||||
}
|
||||
final Store store = storeProvider.apply(indexSettings);
|
||||
boolean success = false;
|
||||
try {
|
||||
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
|
||||
|
@ -424,7 +435,18 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
*/
|
||||
protected IndexShard newStartedShard(
|
||||
final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException {
|
||||
IndexShard shard = newShard(primary, settings, engineFactory);
|
||||
return newStartedShard(p -> newShard(p, settings, engineFactory), primary);
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a new empty shard and starts it.
|
||||
*
|
||||
* @param shardFunction shard factory function
|
||||
* @param primary controls whether the shard will be a primary or a replica.
|
||||
*/
|
||||
protected IndexShard newStartedShard(CheckedFunction<Boolean, IndexShard, IOException> shardFunction,
|
||||
boolean primary) throws IOException {
|
||||
IndexShard shard = shardFunction.apply(primary);
|
||||
if (primary) {
|
||||
recoverShardFromStore(shard);
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue