From 0749b18181c8dbfc3f85a4a6d64dd09beeb411ac Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 20 Aug 2018 19:22:10 +0200 Subject: [PATCH] All Translog inner closes should happen after tragedy exception is set (#32674) All Translog inner closes should happen after tragedy exception is set (#32674) We faced with the nasty race condition. See #32526 InternalEngine.failOnTragic method has thrown AssertionError. If you carefully look at if branches in this method, you will spot that its only possible, if either Lucene IndexWriterhas closed from inside or Translog, has closed from inside, but tragedy exception is not set. For now, let us concentrate on the Translog class. We found out that there are two methods in Translog - namely rollGeneration and trimOperations that are closing Translog in case of Exception without tragedy exception being set. This commit fixes these 2 methods. To fix it, we pull tragedyException from TranslogWriter up-to Translog class, because in these 2 methods IndexWriter could be innocent, but still Translog needs to be closed. Also, tragedyException is wrapped with TragicExceptionHolder to reuse CAS/addSuppresed functionality in Translog and TranslogWriter. Also to protect us in the future and make sure close method is never called from inside Translog special assertion examining stack trace is added. Since we're still targeting Java 8 for runtime - no StackWalker API is used in the implementation. In the stack-trace checking method, we're considering inner caller not only Translog methods but Translog child classes methods as well. It does mean that Translog is meant for extending it, but it's needed to be able to test this method. Closes #32526 --- .../index/translog/TragicExceptionHolder.java | 43 ++++++++++++++++ .../index/translog/Translog.java | 42 ++++++++++++---- .../index/translog/TranslogWriter.java | 35 +++++-------- .../translog/TranslogDeletionPolicyTests.java | 2 +- .../index/translog/TranslogTests.java | 49 ++++++++++++++++++- 5 files changed, 136 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java diff --git a/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java new file mode 100644 index 00000000000..b823a920039 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import java.util.concurrent.atomic.AtomicReference; + +public class TragicExceptionHolder { + private final AtomicReference tragedy = new AtomicReference<>(); + + /** + * Sets the tragic exception or if the tragic exception is already set adds passed exception as suppressed exception + * @param ex tragic exception to set + */ + public void setTragicException(Exception ex) { + assert ex != null; + if (tragedy.compareAndSet(null, ex) == false) { + if (tragedy.get() != ex) { // to ensure there is no self-suppression + tragedy.get().addSuppressed(ex); + } + } + } + + public Exception get() { + return tragedy.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index e426b3a7253..72c6210535f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -66,6 +66,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -117,6 +118,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; + protected final TragicExceptionHolder tragedy = new TragicExceptionHolder(); private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; @@ -310,8 +312,28 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return closed.get() == false; } + private static boolean calledFromOutsideOrViaTragedyClose() { + List frames = Stream.of(Thread.currentThread().getStackTrace()). + skip(3). //skip getStackTrace, current method and close method frames + limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils + filter(f -> + { + try { + return Translog.class.isAssignableFrom(Class.forName(f.getClassName())); + } catch (Exception ignored) { + return false; + } + } + ). //find all inner callers including Translog subclasses + collect(Collectors.toList()); + //the list of inner callers should be either empty or should contain closeOnTragicEvent method + return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent")); + } + @Override public void close() throws IOException { + assert calledFromOutsideOrViaTragedyClose() : + "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { try { @@ -462,7 +484,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC getChannelFactory(), config.getBufferSize(), initialMinTranslogGen, initialGlobalCheckpoint, - globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong()); + globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -726,7 +748,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } catch (IOException e) { IOUtils.closeWhileHandlingException(newReaders); - close(); + tragedy.setTragicException(e); + closeOnTragicEvent(e); throw e; } @@ -779,10 +802,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * * @param ex if an exception occurs closing the translog, it will be suppressed into the provided exception */ - private void closeOnTragicEvent(final Exception ex) { + protected void closeOnTragicEvent(final Exception ex) { // we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName(); - if (current.getTragicException() != null) { + if (tragedy.get() != null) { try { close(); } catch (final AlreadyClosedException inner) { @@ -1556,7 +1579,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC current = createWriter(current.getGeneration() + 1); logger.trace("current translog set to [{}]", current.getGeneration()); } catch (final Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event + tragedy.setTragicException(e); + closeOnTragicEvent(e); throw e; } } @@ -1669,7 +1693,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private void ensureOpen() { if (closed.get()) { - throw new AlreadyClosedException("translog is already closed", current.getTragicException()); + throw new AlreadyClosedException("translog is already closed", tragedy.get()); } } @@ -1683,7 +1707,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Otherwise (no tragic exception has occurred) it returns null. */ public Exception getTragicException() { - return current.getTragicException(); + return tragedy.get(); } /** Reads and returns the current checkpoint */ @@ -1766,8 +1790,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC final String translogUUID = UUIDs.randomBase64UUID(); TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory, new ByteSizeValue(10), 1, initialGlobalCheckpoint, - () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm - ); + () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, + new TragicExceptionHolder()); writer.close(); return translogUUID; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index b779644cd5c..f48f2ceb792 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -51,7 +51,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the number of translog operations written to this file */ private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ - private volatile Exception tragedy; + private final TragicExceptionHolder tragedy; /* A buffered outputstream what writes to the writers channel */ private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ @@ -76,7 +76,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException { + final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, + TragicExceptionHolder tragedy) + throws + IOException { super(initialCheckpoint.generation, channel, path, header); assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" @@ -94,12 +97,13 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; + this.tragedy = tragedy; } public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, - final long primaryTerm) + final long primaryTerm, TragicExceptionHolder tragedy) throws IOException { final FileChannel channel = channelFactory.open(file); try { @@ -120,7 +124,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { writerGlobalCheckpointSupplier = globalCheckpointSupplier; } return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, - writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header); + writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -129,24 +133,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { } } - /** - * If this {@code TranslogWriter} was closed as a side-effect of a tragic exception, - * e.g. disk full while flushing a new segment, this returns the root cause exception. - * Otherwise (no tragic exception has occurred) it returns null. - */ - public Exception getTragicException() { - return tragedy; - } - private synchronized void closeWithTragicEvent(final Exception ex) { - assert ex != null; - if (tragedy == null) { - tragedy = ex; - } else if (tragedy != ex) { - // it should be safe to call closeWithTragicEvents on multiple layers without - // worrying about self suppression. - tragedy.addSuppressed(ex); - } + tragedy.setTragicException(ex); try { close(); } catch (final IOException | RuntimeException e) { @@ -296,7 +284,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { if (closed.compareAndSet(false, true)) { return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); } else { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", + tragedy.get()); } } } @@ -406,7 +395,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { protected final void ensureOpen() { if (isClosed()) { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy); + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy.get()); } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 9ae502fecb5..c8d4dbd43df 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -171,7 +171,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L, randomNonNegativeLong()); + () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder()); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1c27a59e0ec..4ec479334ba 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.Assertions; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -108,6 +109,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -1655,7 +1657,7 @@ public class TranslogTests extends ESTestCase { } assertThat(expectedException, is(not(nullValue()))); - + assertThat(failableTLog.getTragicException(), equalTo(expectedException)); assertThat(fileChannels, is(not(empty()))); assertThat("all file channels have to be closed", fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false)); @@ -2505,11 +2507,13 @@ public class TranslogTests extends ESTestCase { syncedDocs.addAll(unsynced); unsynced.clear(); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { - // fair enough + assertEquals(failableTLog.getTragicException(), ex); } catch (IOException ex) { assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); + assertEquals(failableTLog.getTragicException(), ex); } catch (RuntimeException ex) { assertEquals(ex.getMessage(), "simulated"); + assertEquals(failableTLog.getTragicException(), ex); } finally { Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath()); if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) { @@ -2931,6 +2935,47 @@ public class TranslogTests extends ESTestCase { } } + // close method should never be called directly from Translog (the only exception is closeOnTragicEvent) + public void testTranslogCloseInvariant() throws IOException { + assumeTrue("test only works with assertions enabled", Assertions.ENABLED); + class MisbehavingTranslog extends Translog { + MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException { + super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier); + } + + void callCloseDirectly() throws IOException { + close(); + } + + void callCloseUsingIOUtilsWithExceptionHandling() { + IOUtils.closeWhileHandlingException(this); + } + + void callCloseUsingIOUtils() throws IOException { + IOUtils.close(this); + } + + void callCloseOnTragicEvent() { + Exception e = new Exception("test tragic exception"); + tragedy.setTragicException(e); + closeOnTragicEvent(e); + } + } + + + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path path = createTempDir(); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get); + + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly()); + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils()); + expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtilsWithExceptionHandling()); + misbehavingTranslog.callCloseOnTragicEvent(); + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null;