From 2fd257add205ef9af228c79917f30533c0a0bab8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 30 Apr 2020 08:02:29 -0400 Subject: [PATCH] Ensure no circular reference in translog tragic exception (#55959) We generate a circular reference exception in translog in 6.8 in the following scenario: - The first rollGeneration hits "too many open files" exception when it's copying a checkpoint file. We will set the tragic exception and close the translog - The second rollGeneration hits AlreadyClosedException as the current writer is closed. We will suppress the ACE to the current tragic exception. Unfortunately, this leads to a circular reference as ACE already suppresses the tragic exception. Other factors that help to manifest this bug: - We do not fail the engine on AlreadyClosedException in flush - We do not check for ensureOpen before rolling a new generation Closes #55893 --- .../org/elasticsearch/ExceptionsHelper.java | 2 +- .../index/engine/InternalEngine.java | 1 + .../index/translog/TragicExceptionHolder.java | 16 +++-- .../index/translog/Translog.java | 11 +++- .../index/translog/TranslogTests.java | 63 +++++++++++++++++++ 5 files changed, 85 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 690c359116a..884c563504e 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -239,7 +239,7 @@ public final class ExceptionsHelper { } @SuppressWarnings("unchecked") - private static Optional unwrapCausesAndSuppressed(Throwable cause, Predicate predicate) { + public static Optional unwrapCausesAndSuppressed(Throwable cause, Predicate predicate) { if (predicate.test(cause)) { return Optional.of((T) cause); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 95c60b6201e..f92024312a2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1808,6 +1808,7 @@ public class InternalEngine extends Engine { refresh("version_table_flush", SearcherScope.INTERNAL, true); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { + failOnTragicEvent(e); throw e; } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java index b823a920039..2d8b105099b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -19,6 +19,9 @@ package org.elasticsearch.index.translog; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; + import java.util.concurrent.atomic.AtomicReference; public class TragicExceptionHolder { @@ -30,10 +33,15 @@ public class TragicExceptionHolder { */ public void setTragicException(Exception ex) { assert ex != null; - if (tragedy.compareAndSet(null, ex) == false) { - if (tragedy.get() != ex) { // to ensure there is no self-suppression - tragedy.get().addSuppressed(ex); - } + if (tragedy.compareAndSet(null, ex)) { + return; // first exception + } + final Exception tragedy = this.tragedy.get(); + // ensure no circular reference + if (ExceptionsHelper.unwrapCausesAndSuppressed(ex, e -> e == tragedy).isPresent()) { + assert ex == tragedy || ex instanceof AlreadyClosedException : new AssertionError("must be ACE or tragic exception", ex); + } else { + tragedy.addSuppressed(ex); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 202c363e712..79aaac75b4f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1623,10 +1623,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * @throws IOException if an I/O exception occurred during any file operations */ public void rollGeneration() throws IOException { - // make sure we move most of the data to disk outside of the writeLock - // in order to reduce the time the lock is held since it's blocking all threads - sync(); + syncBeforeRollGeneration(); try (Releasable ignored = writeLock.acquire()) { + ensureOpen(); try { final TranslogReader reader = current.closeIntoReader(); readers.add(reader); @@ -1643,6 +1642,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } + void syncBeforeRollGeneration() throws IOException { + // make sure we move most of the data to disk outside of the writeLock + // in order to reduce the time the lock is held since it's blocking all threads + sync(); + } + /** * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum * required generation diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 6a96d6c609f..18fecd4d520 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -100,9 +100,12 @@ import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -3307,4 +3310,64 @@ public class TranslogTests extends ESTestCase { } } } + + public void testEnsureNoCircularException() throws Exception { + final AtomicBoolean failedToSyncCheckpoint = new AtomicBoolean(); + final ChannelFactory channelFactory = (file, openOption) -> { + final FileChannel channel = FileChannel.open(file, openOption); + return new FilterFileChannel(channel) { + @Override + public void force(boolean metaData) throws IOException { + if (failedToSyncCheckpoint.get()) { + throw new IOException("simulated"); + } + super.force(metaData); + } + }; + }; + final TranslogConfig config = getTranslogConfig(createTempDir()); + final String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get()); + final Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, + seqNo -> {}) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + + @Override + void syncBeforeRollGeneration() { + // make it a noop like the old versions + } + }; + try { + translog.add(new Translog.Index("1", "_doc", 1, primaryTerm.get(), new byte[]{1})); + failedToSyncCheckpoint.set(true); + expectThrows(IOException.class, translog::rollGeneration); + final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, translog::rollGeneration); + if (hasCircularReference(alreadyClosedException)) { + throw new AssertionError("detect circular reference exception", alreadyClosedException); + } + } finally { + IOUtils.close(translog); + } + } + + static boolean hasCircularReference(Exception cause) { + final Queue queue = new LinkedList<>(); + queue.add(cause); + final Set seen = Collections.newSetFromMap(new IdentityHashMap<>()); + while (queue.isEmpty() == false) { + final Throwable current = queue.remove(); + if (seen.add(current) == false) { + return true; + } + Collections.addAll(queue, current.getSuppressed()); + if (current.getCause() != null) { + queue.add(current.getCause()); + } + } + return false; + } }