diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6cbcfbe7208..eba91ab21de 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -72,6 +71,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.index.translog.TranslogConfig.EMPTY_TRANSLOG_BUFFER_SIZE; + /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. @@ -116,7 +117,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - private BigArrays bigArrays; + private final BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; private final Path location; @@ -505,7 +506,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC config.getBufferSize(), initialMinTranslogGen, initialGlobalCheckpoint, globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy, - persistedSequenceNumberConsumer); + persistedSequenceNumberConsumer, + bigArrays); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -521,7 +523,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); - boolean successfullySerialized = false; try { final long start = out.position(); out.skip(Integer.BYTES); @@ -531,9 +532,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC out.seek(start); out.writeInt(operationSize); out.seek(end); - successfullySerialized = true; - try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out); - ReleasableLock ignored = readLock.acquire()) { + final BytesReference bytes = out.bytes(); + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (operation.primaryTerm() > current.getPrimaryTerm()) { assert false : @@ -551,9 +551,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC closeOnTragicEvent(ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); } finally { - if (successfullySerialized == false) { - Releasables.close(out); - } + Releasables.close(out); } } @@ -1911,7 +1909,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, - TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, + EMPTY_TRANSLOG_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, () -> { throw new UnsupportedOperationException(); }, () -> { @@ -1921,7 +1919,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); - }); + }, BigArrays.NON_RECYCLING_INSTANCE); writer.close(); return uuid; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 23c43c3d923..ba9547ec6e3 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -35,6 +35,7 @@ import java.nio.file.Path; public final class TranslogConfig { public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); + public static final ByteSizeValue EMPTY_TRANSLOG_BUFFER_SIZE = new ByteSizeValue(10, ByteSizeUnit.BYTES); private final BigArrays bigArrays; private final IndexSettings indexSettings; private final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 0aa35712e1b..299829e8472 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -32,9 +32,10 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.DiskIoBufferPool; -import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -46,8 +47,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -61,6 +60,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; + private final BigArrays bigArrays; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -87,8 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64); private final int forceWriteThreshold; - private final ArrayList bufferedOps = new ArrayList<>(); - private long bufferedBytes = 0L; + private ReleasableBytesStreamOutput buffer; private final Map> seenSequenceNumbers; @@ -101,8 +100,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final Path checkpointPath, final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, - TragicExceptionHolder tragedy, - final LongConsumer persistedSequenceNumberConsumer) + final TragicExceptionHolder tragedy, + final LongConsumer persistedSequenceNumberConsumer, + final BigArrays bigArrays) throws IOException { super(initialCheckpoint.generation, channel, path, header); @@ -123,6 +123,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; + this.bigArrays = bigArrays; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; } @@ -130,7 +131,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, - final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer) + final long primaryTerm, TragicExceptionHolder tragedy, + final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays) throws IOException { final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME); @@ -155,7 +157,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { writerGlobalCheckpointSupplier = globalCheckpointSupplier; } return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize, - writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer); + writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer, bigArrays); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation @@ -182,15 +184,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException { + public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { final Translog.Location location; final long bytesBufferedAfterAdd; synchronized (this) { ensureOpen(); + if (buffer == null) { + buffer = new ReleasableBytesStreamOutput(bigArrays); + } final long offset = totalOffset; totalOffset += data.length(); - bufferedBytes += data.length(); - bufferedOps.add(data.retain()); + data.writeTo(buffer); assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; @@ -205,7 +209,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { assert assertNoSeqNumberConflict(seqNo, data); location = new Translog.Location(generation, offset, data.length()); - bytesBufferedAfterAdd = bufferedBytes; + bytesBufferedAfterAdd = buffer.size(); } if (bytesBufferedAfterAdd >= forceWriteThreshold) { @@ -335,7 +339,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { throw ex; } // If we reached this point, all of the buffered ops should have been flushed successfully. - assert bufferedOps.size() == 0; + assert buffer == null; assert checkChannelPositionWhileHandlingException(totalOffset); assert totalOffset == lastSyncedCheckpoint.offset; if (closed.compareAndSet(false, true)) { @@ -372,7 +376,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); } // If we reached this point, all of the buffered ops should have been flushed successfully. - assert bufferedOps.size() == 0; + assert buffer == null; assert checkChannelPositionWhileHandlingException(totalOffset); assert totalOffset == lastSyncedCheckpoint.offset; return super.newSnapshot(); @@ -398,7 +402,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; final LongArrayList flushedSequenceNumbers; - final ArrayDeque toWrite; + final ReleasableBytesReference toWrite; try (ReleasableLock toClose = writeLock.acquire()) { synchronized (this) { ensureOpen(); @@ -449,44 +453,39 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { } } - private synchronized ArrayDeque pollOpsToWrite() { + private synchronized ReleasableBytesReference pollOpsToWrite() { ensureOpen(); - final ArrayDeque operationsToWrite = new ArrayDeque<>(bufferedOps.size()); - operationsToWrite.addAll(bufferedOps); - bufferedOps.clear(); - bufferedBytes = 0; - return operationsToWrite; + if (this.buffer != null) { + ReleasableBytesStreamOutput toWrite = this.buffer; + this.buffer = null; + return new ReleasableBytesReference(toWrite.bytes(), toWrite); + } else { + return ReleasableBytesReference.wrap(BytesArray.EMPTY); + } } - private void writeAndReleaseOps(final ArrayDeque operationsToWrite) throws IOException { - try { + private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException { + try (ReleasableBytesReference toClose = toWrite) { assert writeLock.isHeldByCurrentThread(); ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer(); - ReleasableBytesReference operation; - while ((operation = operationsToWrite.pollFirst()) != null) { - try (Releasable toClose = operation) { - BytesRefIterator iterator = operation.iterator(); - BytesRef current; - while ((current = iterator.next()) != null) { - int currentBytesConsumed = 0; - while (currentBytesConsumed != current.length) { - int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); - ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); - currentBytesConsumed += nBytesToWrite; - if (ioBuffer.hasRemaining() == false) { - ioBuffer.flip(); - writeToFile(ioBuffer); - ioBuffer.clear(); - } - } + BytesRefIterator iterator = toWrite.iterator(); + BytesRef current; + while ((current = iterator.next()) != null) { + int currentBytesConsumed = 0; + while (currentBytesConsumed != current.length) { + int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); + ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); + currentBytesConsumed += nBytesToWrite; + if (ioBuffer.hasRemaining() == false) { + ioBuffer.flip(); + writeToFile(ioBuffer); + ioBuffer.clear(); } } } ioBuffer.flip(); writeToFile(ioBuffer); - } finally { - Releasables.close(operationsToWrite); } } @@ -550,8 +549,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public final void close() throws IOException { if (closed.compareAndSet(false, true)) { synchronized (this) { - Releasables.closeWhileHandlingException(bufferedOps); - bufferedOps.clear(); + Releasables.closeWhileHandlingException(buffer); + buffer = null; } IOUtils.close(checkpointChannel, channel); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index c765c08afb3..c3f2b5dc772 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -20,12 +20,13 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.ByteArrayDataOutput; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.mockito.Mockito; @@ -190,7 +191,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}); + () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 47a0e07e562..b200cf13dc3 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1267,7 +1267,7 @@ public class TranslogTests extends ESTestCase { final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); final Set persistedSeqNos = new HashSet<>(); persistedSeqNoConsumer.set(persistedSeqNos::add); - final int numOps = randomIntBetween(8, 128); + final int numOps = scaledRandomIntBetween(8, 250000); final Set seenSeqNos = new HashSet<>(); boolean opsHaveValidSequenceNumbers = randomBoolean(); for (int i = 0; i < numOps; i++) {