Flush translog writer before adding new operation (#63505)

Currently we flush the Translog buffer when a new operation causes the
buffer to breach 1MB. This introduces a scenario where an exception is
thrown AFTER the writer has accepted the operation. To avoid this, this
commit flushes the Translog in an #add call before adding a new
operation.

This fixes #63299.
This commit is contained in:
Tim Brooks 2020-10-08 13:55:48 -06:00
parent 01dde647a6
commit 56092b1a9f
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
2 changed files with 15 additions and 10 deletions

View File

@ -87,6 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64); private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64);
private final int forceWriteThreshold; private final int forceWriteThreshold;
private volatile long bufferedBytes;
private ReleasableBytesStreamOutput buffer; private ReleasableBytesStreamOutput buffer;
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers; private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
@ -185,13 +186,18 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
* @throws IOException if writing to the translog resulted in an I/O exception * @throws IOException if writing to the translog resulted in an I/O exception
*/ */
public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
long bufferedBytesBeforeAdd = this.bufferedBytes;
if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
}
final Translog.Location location; final Translog.Location location;
final long bytesBufferedAfterAdd;
synchronized (this) { synchronized (this) {
ensureOpen(); ensureOpen();
if (buffer == null) { if (buffer == null) {
buffer = new ReleasableBytesStreamOutput(bigArrays); buffer = new ReleasableBytesStreamOutput(bigArrays);
} }
assert bufferedBytes == buffer.size();
final long offset = totalOffset; final long offset = totalOffset;
totalOffset += data.length(); totalOffset += data.length();
data.writeTo(buffer); data.writeTo(buffer);
@ -209,11 +215,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
assert assertNoSeqNumberConflict(seqNo, data); assert assertNoSeqNumberConflict(seqNo, data);
location = new Translog.Location(generation, offset, data.length()); location = new Translog.Location(generation, offset, data.length());
bytesBufferedAfterAdd = buffer.size(); bufferedBytes = buffer.size();
}
if (bytesBufferedAfterAdd >= forceWriteThreshold) {
writeBufferedOps(Long.MAX_VALUE, bytesBufferedAfterAdd >= forceWriteThreshold * 4);
} }
return location; return location;
@ -458,6 +460,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
if (this.buffer != null) { if (this.buffer != null) {
ReleasableBytesStreamOutput toWrite = this.buffer; ReleasableBytesStreamOutput toWrite = this.buffer;
this.buffer = null; this.buffer = null;
this.bufferedBytes = 0;
return new ReleasableBytesReference(toWrite.bytes(), toWrite); return new ReleasableBytesReference(toWrite.bytes(), toWrite);
} else { } else {
return ReleasableBytesReference.wrap(BytesArray.EMPTY); return ReleasableBytesReference.wrap(BytesArray.EMPTY);
@ -551,6 +554,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
synchronized (this) { synchronized (this) {
Releasables.closeWhileHandlingException(buffer); Releasables.closeWhileHandlingException(buffer);
buffer = null; buffer = null;
bufferedBytes = 0;
} }
IOUtils.close(checkpointChannel, channel); IOUtils.close(checkpointChannel, channel);
} }

View File

@ -1383,12 +1383,13 @@ public class TranslogTests extends ESTestCase {
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
assertThat(persistedSeqNos, empty()); assertThat(persistedSeqNos, empty());
assertEquals(initialWriteCalls, writeCalls.get()); assertEquals(initialWriteCalls, writeCalls.get());
if (randomBoolean()) { if (randomBoolean()) {
// This will fill the buffer and force a flush // Since the buffer is full, this will flush before performing the add.
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5);
assertThat(persistedSeqNos, empty()); assertThat(persistedSeqNos, empty());
assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); assertThat(writeCalls.get(), greaterThan(initialWriteCalls));
} else { } else {
@ -1398,13 +1399,13 @@ public class TranslogTests extends ESTestCase {
assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); assertThat(writeCalls.get(), greaterThan(initialWriteCalls));
// Add after we the read flushed the buffer // Add after we the read flushed the buffer
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5);
} }
writer.sync(); writer.sync();
// Sequence numbers are marked as persisted after sync // Sequence numbers are marked as persisted after sync
assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L)); assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L, 5L));
} }
} }