[ENGINE] Mark store as corrupted before sending failed shard
We have to mark a shard as corrupted if necessary before the shard failed event is fired ie. before we call the corresponding listener in the engine. Otherwise the shard might be re-allocated on the same node and just started up without being marked as corrupted. Relates to #5924
This commit is contained in:
parent
e8ff007852
commit
86bc79202d
|
@ -24,7 +24,6 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
|||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
|
@ -396,6 +395,10 @@ public class Lucene {
|
|||
return DirectoryReader.indexExists(directory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> iff the given exception or
|
||||
* one of it's causes is an instance of {@link CorruptIndexException} otherwise <tt>false</tt>.
|
||||
*/
|
||||
public static boolean isCorruptionException(Throwable t) {
|
||||
return ExceptionsHelper.unwrap(t, CorruptIndexException.class) != null;
|
||||
}
|
||||
|
|
|
@ -1275,34 +1275,35 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
public void failEngine(String reason, Throwable failure) {
|
||||
assert failure != null;
|
||||
if (failEngineLock.tryLock()) {
|
||||
|
||||
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
|
||||
if (failedEngine != null) {
|
||||
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
logger.warn("failed engine [{}]", reason, failure);
|
||||
// we must set a failure exception, generate one if not supplied
|
||||
failedEngine = failure;
|
||||
for (FailedEngineListener listener : failedEngineListeners) {
|
||||
listener.onFailedEngine(shardId, reason, failure);
|
||||
// we first mark the store as corrupted before we notify any listeners
|
||||
// this must happen first otherwise we might try to reallocate so quickly
|
||||
// on the same node that we don't see the corrupted marker file when
|
||||
// the shard is initializing
|
||||
if (Lucene.isCorruptionException(failure)) {
|
||||
try {
|
||||
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
|
||||
} catch (IOException e) {
|
||||
logger.warn("Couldn't marks store corrupted", e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
|
||||
if (failedEngine != null) {
|
||||
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (Lucene.isCorruptionException(failure)) {
|
||||
try {
|
||||
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
|
||||
} catch (IOException e) {
|
||||
logger.warn("Couldn't marks store corrupted", e);
|
||||
}
|
||||
logger.warn("failed engine [{}]", reason, failure);
|
||||
// we must set a failure exception, generate one if not supplied
|
||||
failedEngine = failure;
|
||||
for (FailedEngineListener listener : failedEngineListeners) {
|
||||
listener.onFailedEngine(shardId, reason, failure);
|
||||
}
|
||||
} finally {
|
||||
// close the engine whatever happens...
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue