diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 01148339966..f56afd4e072 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -510,26 +510,17 @@ public abstract class Engine implements Closeable { */ public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException; - /** fail engine due to some error. the engine will also be closed. */ - public void failEngine(String reason, Throwable failure) { - assert failure != null; + /** + * fail engine due to some error. the engine will also be closed. + * The underlying store is marked corrupted iff failure is caused by index corruption + */ + public void failEngine(String reason, @Nullable Throwable failure) { if (failEngineLock.tryLock()) { store.incRef(); try { try { // we just go and close this engine - no way to recover closeNoLock("engine failed on: [" + reason + "]"); - // we first mark the store as corrupted before we notify any listeners - // this must happen first otherwise we might try to reallocate so quickly - // on the same node that we don't see the corrupted marker file when - // the shard is initializing - if (Lucene.isCorruptionException(failure)) { - try { - store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure)); - } catch (IOException e) { - logger.warn("Couldn't marks store corrupted", e); - } - } } finally { if (failedEngine != null) { logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); @@ -537,7 +528,18 @@ public abstract class Engine implements Closeable { } logger.warn("failed engine [{}]", failure, reason); // we must set a failure exception, generate one if not supplied - failedEngine = failure; + failedEngine = (failure != null) ? failure : new IllegalStateException(reason); + // we first mark the store as corrupted before we notify any listeners + // this must happen first otherwise we might try to reallocate so quickly + // on the same node that we don't see the corrupted marker file when + // the shard is initializing + if (Lucene.isCorruptionException(failure)) { + try { + store.markStoreCorrupted(new IOException("failed engine (reason: [" + reason + "])", ExceptionsHelper.unwrapCorruption(failure))); + } catch (IOException e) { + logger.warn("Couldn't mark store corrupted", e); + } + } failedEngineListener.onFailedEngine(shardId, reason, failure); } } catch (Throwable t) { @@ -554,10 +556,10 @@ public abstract class Engine implements Closeable { /** Check whether the engine should be failed */ protected boolean maybeFailEngine(String source, Throwable t) { if (Lucene.isCorruptionException(t)) { - failEngine("corrupt file detected source: [" + source + "]", t); + failEngine("corrupt file (source: [" + source + "])", t); return true; } else if (ExceptionsHelper.isOOM(t)) { - failEngine("out of memory", t); + failEngine("out of memory (source: [" + source + "])", t); return true; } return false; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7c3caf09c7d..4605ce3d749 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -741,7 +741,11 @@ public class IndexShard extends AbstractIndexShardComponent { } } - public void failShard(String reason, Throwable e) { + /** + * Fails the shard and marks the shard store as corrupted if + * e is caused by index corruption + */ + public void failShard(String reason, @Nullable Throwable e) { // fail the engine. This will cause this shard to also be removed from the node's index service. engine().failEngine(reason, e); } @@ -1271,18 +1275,11 @@ public class IndexShard extends AbstractIndexShardComponent { // called by the current engine @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) { - try { - // mark as corrupted, so opening the store will fail - store.markStoreCorrupted(new IOException("failed engine (reason: [" + reason + "])", failure)); - } catch (IOException e) { - logger.warn("failed to mark shard store as corrupted", e); - } finally { - for (Engine.FailedEngineListener listener : delegates) { - try { - listener.onFailedEngine(shardId, reason, failure); - } catch (Exception e) { - logger.warn("exception while notifying engine failure", e); - } + for (Engine.FailedEngineListener listener : delegates) { + try { + listener.onFailedEngine(shardId, reason, failure); + } catch (Exception e) { + logger.warn("exception while notifying engine failure", e); } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 2b691f558e1..a466147e71c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -68,7 +68,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { // that case, fail the shard to reallocate a new IndexShard and // create a new IndexWriter logger.info("recovery failed for primary shadow shard, failing shard"); - shard.failShard("primary relocation failed on shared filesystem", t); + // pass the failure as null, as we want to ensure the store is not marked as corrupted + shard.failShard("primary relocation failed on shared filesystem caused by: [" + t.getMessage() + "]", null); } else { logger.info("recovery failed on shared filesystem", t); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java index 810118b5be3..73ea27ae84a 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java @@ -647,7 +647,6 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { } @Test - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11788") public void testIndexOnSharedFSRecoversToAnyNode() throws Exception { Settings nodeSettings = nodeSettings(); Settings fooSettings = Settings.builder().put(nodeSettings).put("node.affinity", "foo").build(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5a2f9cdc090..a0da52b6201 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.shard; +import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -215,7 +216,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { IndexService test = indicesService.indexService("test"); IndexShard shard = test.shard(0); // fail shard - shard.failShard("test shard fail", new IOException("corrupted")); + shard.failShard("test shard fail", new CorruptIndexException("", "")); // check state file still exists ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard));