Ensure no circular reference in translog tragic exception (#55959)
We generate a circular reference exception in translog in 6.8 in the following scenario: - The first rollGeneration hits "too many open files" exception when it's copying a checkpoint file. We will set the tragic exception and close the translog - The second rollGeneration hits AlreadyClosedException as the current writer is closed. We will suppress the ACE to the current tragic exception. Unfortunately, this leads to a circular reference as ACE already suppresses the tragic exception. Other factors that help to manifest this bug: - We do not fail the engine on AlreadyClosedException in flush - We do not check for ensureOpen before rolling a new generation Closes #55893
This commit is contained in:
parent
c204353249
commit
2fd257add2
|
@ -239,7 +239,7 @@ public final class ExceptionsHelper {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwable cause, Predicate<Throwable> predicate) {
|
||||
public static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwable cause, Predicate<Throwable> predicate) {
|
||||
if (predicate.test(cause)) {
|
||||
return Optional.of((T) cause);
|
||||
}
|
||||
|
|
|
@ -1808,6 +1808,7 @@ public class InternalEngine extends Engine {
|
|||
refresh("version_table_flush", SearcherScope.INTERNAL, true);
|
||||
translog.trimUnreferencedReaders();
|
||||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class TragicExceptionHolder {
|
||||
|
@ -30,10 +33,15 @@ public class TragicExceptionHolder {
|
|||
*/
|
||||
public void setTragicException(Exception ex) {
|
||||
assert ex != null;
|
||||
if (tragedy.compareAndSet(null, ex) == false) {
|
||||
if (tragedy.get() != ex) { // to ensure there is no self-suppression
|
||||
tragedy.get().addSuppressed(ex);
|
||||
}
|
||||
if (tragedy.compareAndSet(null, ex)) {
|
||||
return; // first exception
|
||||
}
|
||||
final Exception tragedy = this.tragedy.get();
|
||||
// ensure no circular reference
|
||||
if (ExceptionsHelper.unwrapCausesAndSuppressed(ex, e -> e == tragedy).isPresent()) {
|
||||
assert ex == tragedy || ex instanceof AlreadyClosedException : new AssertionError("must be ACE or tragic exception", ex);
|
||||
} else {
|
||||
tragedy.addSuppressed(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1623,10 +1623,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* @throws IOException if an I/O exception occurred during any file operations
|
||||
*/
|
||||
public void rollGeneration() throws IOException {
|
||||
// make sure we move most of the data to disk outside of the writeLock
|
||||
// in order to reduce the time the lock is held since it's blocking all threads
|
||||
sync();
|
||||
syncBeforeRollGeneration();
|
||||
try (Releasable ignored = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
try {
|
||||
final TranslogReader reader = current.closeIntoReader();
|
||||
readers.add(reader);
|
||||
|
@ -1643,6 +1642,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
void syncBeforeRollGeneration() throws IOException {
|
||||
// make sure we move most of the data to disk outside of the writeLock
|
||||
// in order to reduce the time the lock is held since it's blocking all threads
|
||||
sync();
|
||||
}
|
||||
|
||||
/**
|
||||
* Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
|
||||
* required generation
|
||||
|
|
|
@ -100,9 +100,12 @@ import java.util.Comparator;
|
|||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -3307,4 +3310,64 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testEnsureNoCircularException() throws Exception {
|
||||
final AtomicBoolean failedToSyncCheckpoint = new AtomicBoolean();
|
||||
final ChannelFactory channelFactory = (file, openOption) -> {
|
||||
final FileChannel channel = FileChannel.open(file, openOption);
|
||||
return new FilterFileChannel(channel) {
|
||||
@Override
|
||||
public void force(boolean metaData) throws IOException {
|
||||
if (failedToSyncCheckpoint.get()) {
|
||||
throw new IOException("simulated");
|
||||
}
|
||||
super.force(metaData);
|
||||
}
|
||||
};
|
||||
};
|
||||
final TranslogConfig config = getTranslogConfig(createTempDir());
|
||||
final String translogUUID = Translog.createEmptyTranslog(
|
||||
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
|
||||
final Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(),
|
||||
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get,
|
||||
seqNo -> {}) {
|
||||
@Override
|
||||
ChannelFactory getChannelFactory() {
|
||||
return channelFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
void syncBeforeRollGeneration() {
|
||||
// make it a noop like the old versions
|
||||
}
|
||||
};
|
||||
try {
|
||||
translog.add(new Translog.Index("1", "_doc", 1, primaryTerm.get(), new byte[]{1}));
|
||||
failedToSyncCheckpoint.set(true);
|
||||
expectThrows(IOException.class, translog::rollGeneration);
|
||||
final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, translog::rollGeneration);
|
||||
if (hasCircularReference(alreadyClosedException)) {
|
||||
throw new AssertionError("detect circular reference exception", alreadyClosedException);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(translog);
|
||||
}
|
||||
}
|
||||
|
||||
static boolean hasCircularReference(Exception cause) {
|
||||
final Queue<Throwable> queue = new LinkedList<>();
|
||||
queue.add(cause);
|
||||
final Set<Throwable> seen = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
while (queue.isEmpty() == false) {
|
||||
final Throwable current = queue.remove();
|
||||
if (seen.add(current) == false) {
|
||||
return true;
|
||||
}
|
||||
Collections.addAll(queue, current.getSuppressed());
|
||||
if (current.getCause() != null) {
|
||||
queue.add(current.getCause());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue