Currently when an engine is failed, it is marked as corrupted regardless of

the failure type. This change marks the engine as corrupted only when the failure
is caused by an actual index corrruption. When an engine is failed for other
reasons, the engine is only closed without removing the shard state.

closes #11788
This commit is contained in:
Areek Zillur 2015-06-29 23:13:17 -04:00
parent fbcf4dbbf7
commit 4849e76275
5 changed files with 33 additions and 33 deletions

View File

@ -510,26 +510,17 @@ public abstract class Engine implements Closeable {
*/
public abstract SnapshotIndexCommit snapshotIndex(boolean flushFirst) throws EngineException;
/** fail engine due to some error. the engine will also be closed. */
public void failEngine(String reason, Throwable failure) {
assert failure != null;
/**
* fail engine due to some error. the engine will also be closed.
* The underlying store is marked corrupted iff failure is caused by index corruption
*/
public void failEngine(String reason, @Nullable Throwable failure) {
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
@ -537,7 +528,18 @@ public abstract class Engine implements Closeable {
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngine = (failure != null) ? failure : new IllegalStateException(reason);
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(new IOException("failed engine (reason: [" + reason + "])", ExceptionsHelper.unwrapCorruption(failure)));
} catch (IOException e) {
logger.warn("Couldn't mark store corrupted", e);
}
}
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
@ -554,10 +556,10 @@ public abstract class Engine implements Closeable {
/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Throwable t) {
if (Lucene.isCorruptionException(t)) {
failEngine("corrupt file detected source: [" + source + "]", t);
failEngine("corrupt file (source: [" + source + "])", t);
return true;
} else if (ExceptionsHelper.isOOM(t)) {
failEngine("out of memory", t);
failEngine("out of memory (source: [" + source + "])", t);
return true;
}
return false;

View File

@ -741,7 +741,11 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
public void failShard(String reason, Throwable e) {
/**
* Fails the shard and marks the shard store as corrupted if
* <code>e</code> is caused by index corruption
*/
public void failShard(String reason, @Nullable Throwable e) {
// fail the engine. This will cause this shard to also be removed from the node's index service.
engine().failEngine(reason, e);
}
@ -1271,18 +1275,11 @@ public class IndexShard extends AbstractIndexShardComponent {
// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
// mark as corrupted, so opening the store will fail
store.markStoreCorrupted(new IOException("failed engine (reason: [" + reason + "])", failure));
} catch (IOException e) {
logger.warn("failed to mark shard store as corrupted", e);
} finally {
for (Engine.FailedEngineListener listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
for (Engine.FailedEngineListener listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
}
}

View File

@ -68,7 +68,8 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
// that case, fail the shard to reallocate a new IndexShard and
// create a new IndexWriter
logger.info("recovery failed for primary shadow shard, failing shard");
shard.failShard("primary relocation failed on shared filesystem", t);
// pass the failure as null, as we want to ensure the store is not marked as corrupted
shard.failShard("primary relocation failed on shared filesystem caused by: [" + t.getMessage() + "]", null);
} else {
logger.info("recovery failed on shared filesystem", t);
}

View File

@ -647,7 +647,6 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
}
@Test
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11788")
public void testIndexOnSharedFSRecoversToAnyNode() throws Exception {
Settings nodeSettings = nodeSettings();
Settings fooSettings = Settings.builder().put(nodeSettings).put("node.affinity", "foo").build();

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
@ -215,7 +216,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
IndexService test = indicesService.indexService("test");
IndexShard shard = test.shard(0);
// fail shard
shard.failShard("test shard fail", new IOException("corrupted"));
shard.failShard("test shard fail", new CorruptIndexException("", ""));
// check state file still exists
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));