From dd4b0d85fe015f85e5c7ae0a73f72fd87a4eb354 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 6 Oct 2020 10:49:45 -0600 Subject: [PATCH] Write translog operation bytes to byte stream (#63298) Currently we add translog operation bytes to an array list and flush them on the next write. Unfortunately, this does not currently play well with our byte pooling which means each operation is backed, at minimum, by a 16KB array. This commit improves memory efficiency for small operations by serializing the operations to an output stream. --- .../index/translog/Translog.java | 22 +++-- .../index/translog/TranslogConfig.java | 1 + .../index/translog/TranslogWriter.java | 89 +++++++++---------- .../translog/TranslogDeletionPolicyTests.java | 5 +- .../index/translog/TranslogTests.java | 2 +- 5 files changed, 59 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6cbcfbe7208..eba91ab21de 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -72,6 +71,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.index.translog.TranslogConfig.EMPTY_TRANSLOG_BUFFER_SIZE; + /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. @@ -116,7 +117,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - private BigArrays bigArrays; + private final BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; private final Path location; @@ -505,7 +506,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC config.getBufferSize(), initialMinTranslogGen, initialGlobalCheckpoint, globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy, - persistedSequenceNumberConsumer); + persistedSequenceNumberConsumer, + bigArrays); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -521,7 +523,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); - boolean successfullySerialized = false; try { final long start = out.position(); out.skip(Integer.BYTES); @@ -531,9 +532,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC out.seek(start); out.writeInt(operationSize); out.seek(end); - successfullySerialized = true; - try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out); - ReleasableLock ignored = readLock.acquire()) { + final BytesReference bytes = out.bytes(); + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (operation.primaryTerm() > current.getPrimaryTerm()) { assert false : @@ -551,9 +551,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC closeOnTragicEvent(ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); } finally { - if (successfullySerialized == false) { - Releasables.close(out); - } + Releasables.close(out); } } @@ -1911,7 +1909,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, - TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, + EMPTY_TRANSLOG_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, () -> { throw new UnsupportedOperationException(); }, () -> { @@ -1921,7 +1919,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); - }); + }, BigArrays.NON_RECYCLING_INSTANCE); writer.close(); return uuid; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 23c43c3d923..ba9547ec6e3 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -35,6 +35,7 @@ import java.nio.file.Path; public final class TranslogConfig { public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); + public static final ByteSizeValue EMPTY_TRANSLOG_BUFFER_SIZE = new ByteSizeValue(10, ByteSizeUnit.BYTES); private final BigArrays bigArrays; private final IndexSettings indexSettings; private final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 0aa35712e1b..299829e8472 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -32,9 +32,10 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.DiskIoBufferPool; -import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -46,8 +47,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -61,6 +60,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; + private final BigArrays bigArrays; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -87,8 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64); private final int forceWriteThreshold; - private final ArrayList bufferedOps = new ArrayList<>(); - private long bufferedBytes = 0L; + private ReleasableBytesStreamOutput buffer; private final Map> seenSequenceNumbers; @@ -101,8 +100,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final Path checkpointPath, final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, - TragicExceptionHolder tragedy, - final LongConsumer persistedSequenceNumberConsumer) + final TragicExceptionHolder tragedy, + final LongConsumer persistedSequenceNumberConsumer, + final BigArrays bigArrays) throws IOException { super(initialCheckpoint.generation, channel, path, header); @@ -123,6 +123,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; + this.bigArrays = bigArrays; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; } @@ -130,7 +131,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, - final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer) + final long primaryTerm, TragicExceptionHolder tragedy, + final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays) throws IOException { final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME); @@ -155,7 +157,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { writerGlobalCheckpointSupplier = globalCheckpointSupplier; } return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize, - writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer); + writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer, bigArrays); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation @@ -182,15 +184,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException { + public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { final Translog.Location location; final long bytesBufferedAfterAdd; synchronized (this) { ensureOpen(); + if (buffer == null) { + buffer = new ReleasableBytesStreamOutput(bigArrays); + } final long offset = totalOffset; totalOffset += data.length(); - bufferedBytes += data.length(); - bufferedOps.add(data.retain()); + data.writeTo(buffer); assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; @@ -205,7 +209,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert assertNoSeqNumberConflict(seqNo, data); location = new Translog.Location(generation, offset, data.length()); - bytesBufferedAfterAdd = bufferedBytes; + bytesBufferedAfterAdd = buffer.size(); } if (bytesBufferedAfterAdd >= forceWriteThreshold) { @@ -335,7 +339,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { throw ex; } // If we reached this point, all of the buffered ops should have been flushed successfully. - assert bufferedOps.size() == 0; + assert buffer == null; assert checkChannelPositionWhileHandlingException(totalOffset); assert totalOffset == lastSyncedCheckpoint.offset; if (closed.compareAndSet(false, true)) { @@ -372,7 +376,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); } // If we reached this point, all of the buffered ops should have been flushed successfully. - assert bufferedOps.size() == 0; + assert buffer == null; assert checkChannelPositionWhileHandlingException(totalOffset); assert totalOffset == lastSyncedCheckpoint.offset; return super.newSnapshot(); @@ -398,7 +402,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; final LongArrayList flushedSequenceNumbers; - final ArrayDeque toWrite; + final ReleasableBytesReference toWrite; try (ReleasableLock toClose = writeLock.acquire()) { synchronized (this) { ensureOpen(); @@ -449,44 +453,39 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { } } - private synchronized ArrayDeque pollOpsToWrite() { + private synchronized ReleasableBytesReference pollOpsToWrite() { ensureOpen(); - final ArrayDeque operationsToWrite = new ArrayDeque<>(bufferedOps.size()); - operationsToWrite.addAll(bufferedOps); - bufferedOps.clear(); - bufferedBytes = 0; - return operationsToWrite; + if (this.buffer != null) { + ReleasableBytesStreamOutput toWrite = this.buffer; + this.buffer = null; + return new ReleasableBytesReference(toWrite.bytes(), toWrite); + } else { + return ReleasableBytesReference.wrap(BytesArray.EMPTY); + } } - private void writeAndReleaseOps(final ArrayDeque operationsToWrite) throws IOException { - try { + private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException { + try (ReleasableBytesReference toClose = toWrite) { assert writeLock.isHeldByCurrentThread(); ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer(); - ReleasableBytesReference operation; - while ((operation = operationsToWrite.pollFirst()) != null) { - try (Releasable toClose = operation) { - BytesRefIterator iterator = operation.iterator(); - BytesRef current; - while ((current = iterator.next()) != null) { - int currentBytesConsumed = 0; - while (currentBytesConsumed != current.length) { - int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); - ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); - currentBytesConsumed += nBytesToWrite; - if (ioBuffer.hasRemaining() == false) { - ioBuffer.flip(); - writeToFile(ioBuffer); - ioBuffer.clear(); - } - } + BytesRefIterator iterator = toWrite.iterator(); + BytesRef current; + while ((current = iterator.next()) != null) { + int currentBytesConsumed = 0; + while (currentBytesConsumed != current.length) { + int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); + ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); + currentBytesConsumed += nBytesToWrite; + if (ioBuffer.hasRemaining() == false) { + ioBuffer.flip(); + writeToFile(ioBuffer); + ioBuffer.clear(); } } } ioBuffer.flip(); writeToFile(ioBuffer); - } finally { - Releasables.close(operationsToWrite); } } @@ -550,8 +549,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public final void close() throws IOException { if (closed.compareAndSet(false, true)) { synchronized (this) { - Releasables.closeWhileHandlingException(bufferedOps); - bufferedOps.clear(); + Releasables.closeWhileHandlingException(buffer); + buffer = null; } IOUtils.close(checkpointChannel, channel); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index c765c08afb3..c3f2b5dc772 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -20,12 +20,13 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.ByteArrayDataOutput; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.mockito.Mockito; @@ -190,7 +191,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}); + () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 47a0e07e562..b200cf13dc3 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1267,7 +1267,7 @@ public class TranslogTests extends ESTestCase { final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); final Set persistedSeqNos = new HashSet<>(); persistedSeqNoConsumer.set(persistedSeqNos::add); - final int numOps = randomIntBetween(8, 128); + final int numOps = scaledRandomIntBetween(8, 250000); final Set seenSeqNos = new HashSet<>(); boolean opsHaveValidSequenceNumbers = randomBoolean(); for (int i = 0; i < numOps; i++) {