From 56092b1a9fdfd4e920a375793339a275fd5e2c52 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 8 Oct 2020 13:55:48 -0600 Subject: [PATCH] Flush translog writer before adding new operation (#63505) Currently we flush the Translog buffer when a new operation causes the buffer to breach 1MB. This introduces a scenario where an exception is thrown AFTER the writer has accepted the operation. To avoid this, this commit flushes the Translog in an #add call before adding a new operation. This fixes #63299. --- .../index/translog/TranslogWriter.java | 16 ++++++++++------ .../index/translog/TranslogTests.java | 9 +++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 299829e8472..7a58b2a0ace 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -87,6 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64); private final int forceWriteThreshold; + private volatile long bufferedBytes; private ReleasableBytesStreamOutput buffer; private final Map> seenSequenceNumbers; @@ -185,13 +186,18 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * @throws IOException if writing to the translog resulted in an I/O exception */ public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { + long bufferedBytesBeforeAdd = this.bufferedBytes; + if (bufferedBytesBeforeAdd >= forceWriteThreshold) { + writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4); + } + final Translog.Location location; - final long bytesBufferedAfterAdd; synchronized (this) { ensureOpen(); if (buffer == null) { buffer = new ReleasableBytesStreamOutput(bigArrays); } + assert bufferedBytes == buffer.size(); final long offset = totalOffset; totalOffset += data.length(); data.writeTo(buffer); @@ -209,11 +215,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert assertNoSeqNumberConflict(seqNo, data); location = new Translog.Location(generation, offset, data.length()); - bytesBufferedAfterAdd = buffer.size(); - } - - if (bytesBufferedAfterAdd >= forceWriteThreshold) { - writeBufferedOps(Long.MAX_VALUE, bytesBufferedAfterAdd >= forceWriteThreshold * 4); + bufferedBytes = buffer.size(); } return location; @@ -458,6 +460,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { if (this.buffer != null) { ReleasableBytesStreamOutput toWrite = this.buffer; this.buffer = null; + this.bufferedBytes = 0; return new ReleasableBytesReference(toWrite.bytes(), toWrite); } else { return ReleasableBytesReference.wrap(BytesArray.EMPTY); @@ -551,6 +554,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { synchronized (this) { Releasables.closeWhileHandlingException(buffer); buffer = null; + bufferedBytes = 0; } IOUtils.close(checkpointChannel, channel); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b200cf13dc3..0606faba2ef 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1383,12 +1383,13 @@ public class TranslogTests extends ESTestCase { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); assertThat(persistedSeqNos, empty()); assertEquals(initialWriteCalls, writeCalls.get()); if (randomBoolean()) { - // This will fill the buffer and force a flush - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + // Since the buffer is full, this will flush before performing the add. + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); assertThat(persistedSeqNos, empty()); assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); } else { @@ -1398,13 +1399,13 @@ public class TranslogTests extends ESTestCase { assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); // Add after we the read flushed the buffer - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); } writer.sync(); // Sequence numbers are marked as persisted after sync - assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L)); + assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L, 5L)); } }