Fail shard if IndexShard#storeStats runs into an IOException (#32241)
Fail shard if IndexShard#storeStats runs into an IOException. Closes #29008
This commit is contained in:
parent
261002283b
commit
33f11e637d
|
@ -917,6 +917,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
try {
|
||||
return store.stats();
|
||||
} catch (IOException e) {
|
||||
failShard("Failing shard because of exception during storeStats", e);
|
||||
throw new ElasticsearchException("io exception while building 'store stats'", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.lucene.search.IndexSearcher;
|
|||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -112,6 +114,7 @@ import org.elasticsearch.test.DummyShardLock;
|
|||
import org.elasticsearch.test.FieldMaskingReader;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
|
@ -138,6 +141,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -1162,6 +1166,81 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(shard);
|
||||
}
|
||||
|
||||
|
||||
public void testShardStatsWithFailures() throws IOException {
|
||||
allowShardFailures();
|
||||
final ShardId shardId = new ShardId("index", "_na_", 0);
|
||||
final ShardRouting shardRouting = newShardRouting(shardId, "node", true, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, ShardRoutingState.INITIALIZING);
|
||||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
|
||||
|
||||
|
||||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
|
||||
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.build();
|
||||
IndexMetaData metaData = IndexMetaData.builder(shardRouting.getIndexName())
|
||||
.settings(settings)
|
||||
.primaryTerm(0, 1)
|
||||
.build();
|
||||
|
||||
// Override two Directory methods to make them fail at our will
|
||||
// We use AtomicReference here to inject failure in the middle of the test not immediately
|
||||
// We use Supplier<IOException> instead of IOException to produce meaningful stacktrace
|
||||
// (remember stack trace is filled when exception is instantiated)
|
||||
AtomicReference<Supplier<IOException>> exceptionToThrow = new AtomicReference<>();
|
||||
AtomicBoolean throwWhenMarkingStoreCorrupted = new AtomicBoolean(false);
|
||||
Directory directory = new FilterDirectory(newFSDirectory(shardPath.resolveIndex())) {
|
||||
//fileLength method is called during storeStats try block
|
||||
//it's not called when store is marked as corrupted
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
Supplier<IOException> ex = exceptionToThrow.get();
|
||||
if (ex == null) {
|
||||
return super.fileLength(name);
|
||||
} else {
|
||||
throw ex.get();
|
||||
}
|
||||
}
|
||||
|
||||
//listAll method is called when marking store as corrupted
|
||||
@Override
|
||||
public String[] listAll() throws IOException {
|
||||
Supplier<IOException> ex = exceptionToThrow.get();
|
||||
if (throwWhenMarkingStoreCorrupted.get() && ex != null) {
|
||||
throw ex.get();
|
||||
} else {
|
||||
return super.listAll();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
|
||||
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
|
||||
null, new InternalEngineFactory(), () -> {
|
||||
}, EMPTY_EVENT_LISTENER);
|
||||
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
|
||||
shard.addShardFailureCallback((ig)->failureCallbackTriggered.set(true));
|
||||
|
||||
recoverShardFromStore(shard);
|
||||
|
||||
final boolean corruptIndexException = randomBoolean();
|
||||
|
||||
if (corruptIndexException) {
|
||||
exceptionToThrow.set(() -> new CorruptIndexException("Test CorruptIndexException", "Test resource"));
|
||||
throwWhenMarkingStoreCorrupted.set(randomBoolean());
|
||||
} else {
|
||||
exceptionToThrow.set(() -> new IOException("Test IOException"));
|
||||
}
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, shard::storeStats);
|
||||
assertTrue(failureCallbackTriggered.get());
|
||||
|
||||
if (corruptIndexException && !throwWhenMarkingStoreCorrupted.get()) {
|
||||
assertTrue(store.isMarkedCorrupted());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRefreshMetric() throws IOException {
|
||||
IndexShard shard = newStartedShard();
|
||||
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // refresh on: finalize and end of recovery
|
||||
|
@ -1868,6 +1947,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
|
||||
shard.shardPath(),
|
||||
shard.indexSettings().getIndexMetaData(),
|
||||
null,
|
||||
wrapper,
|
||||
new InternalEngineFactory(),
|
||||
() -> {},
|
||||
|
@ -2020,6 +2100,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
|
||||
shard.shardPath(),
|
||||
shard.indexSettings().getIndexMetaData(),
|
||||
null,
|
||||
wrapper,
|
||||
new InternalEngineFactory(),
|
||||
() -> {},
|
||||
|
@ -2506,7 +2587,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
|
||||
.build();
|
||||
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
|
||||
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
|
||||
|
||||
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
|
||||
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
|
||||
|
@ -3005,7 +3086,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
|
||||
AtomicBoolean markedInactive = new AtomicBoolean();
|
||||
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
|
||||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> {
|
||||
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> {
|
||||
}, new IndexEventListener() {
|
||||
@Override
|
||||
public void onShardInactive(IndexShard indexShard) {
|
||||
|
|
|
@ -105,6 +105,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
|
|||
shard.shardPath(),
|
||||
shard.indexSettings().getIndexMetaData(),
|
||||
null,
|
||||
null,
|
||||
new InternalEngineFactory(),
|
||||
() -> {},
|
||||
EMPTY_EVENT_LISTENER);
|
||||
|
|
|
@ -259,13 +259,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
|
||||
public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
|
||||
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
|
||||
shardId,
|
||||
nodeId,
|
||||
false, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.PeerRecoverySource.INSTANCE);
|
||||
shardId,
|
||||
nodeId,
|
||||
false, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.PeerRecoverySource.INSTANCE);
|
||||
|
||||
final IndexShard newReplica =
|
||||
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
|
||||
newShard(shardRouting, shardPath, indexMetaData, null, null, getEngineFactory(shardRouting),
|
||||
() -> {}, EMPTY_EVENT_LISTENER);
|
||||
replicas.add(newReplica);
|
||||
updateAllocationIDsOnPrimary();
|
||||
return newReplica;
|
||||
|
|
|
@ -163,15 +163,20 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
private Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
|
||||
final ShardId shardId = shardPath.getShardId();
|
||||
|
||||
protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
|
||||
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
|
||||
}
|
||||
|
||||
protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
|
||||
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
return newFSDirectory(shardPath.resolveIndex());
|
||||
return directory;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -284,29 +289,32 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
final ShardId shardId = routing.shardId();
|
||||
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
|
||||
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
|
||||
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
|
||||
return newShard(routing, shardPath, indexMetaData, null, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
|
||||
EMPTY_EVENT_LISTENER, listeners);
|
||||
}
|
||||
|
||||
/**
|
||||
* creates a new initializing shard.
|
||||
* @param routing shard routing to use
|
||||
* @param shardPath path to use for shard data
|
||||
* @param indexMetaData indexMetaData for the shard, including any mapping
|
||||
* @param indexSearcherWrapper an optional wrapper to be used during searchers
|
||||
* @param globalCheckpointSyncer callback for syncing global checkpoints
|
||||
* @param indexEventListener index even listener
|
||||
* @param listeners an optional set of listeners to add to the shard
|
||||
* @param routing shard routing to use
|
||||
* @param shardPath path to use for shard data
|
||||
* @param indexMetaData indexMetaData for the shard, including any mapping
|
||||
* @param store an optional custom store to use. If null a default file based store will be created
|
||||
* @param indexSearcherWrapper an optional wrapper to be used during searchers
|
||||
* @param globalCheckpointSyncer callback for syncing global checkpoints
|
||||
* @param indexEventListener index event listener
|
||||
* @param listeners an optional set of listeners to add to the shard
|
||||
*/
|
||||
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
|
||||
@Nullable IndexSearcherWrapper indexSearcherWrapper,
|
||||
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
Runnable globalCheckpointSyncer,
|
||||
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
|
||||
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
|
||||
final IndexShard indexShard;
|
||||
final Store store = createStore(indexSettings, shardPath);
|
||||
if (store == null) {
|
||||
store = createStore(indexSettings, shardPath);
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
|
||||
|
@ -357,6 +365,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
current.shardPath(),
|
||||
current.indexSettings().getIndexMetaData(),
|
||||
null,
|
||||
null,
|
||||
current.engineFactory,
|
||||
current.getGlobalCheckpointSyncer(),
|
||||
EMPTY_EVENT_LISTENER, listeners);
|
||||
|
|
Loading…
Reference in New Issue