Write translog operation bytes to byte stream (#63298)

Currently we add translog operation bytes to an array list and flush
them on the next write. Unfortunately, this does not currently play well
with our byte pooling which means each operation is backed, at minimum,
by a 16KB array. This commit improves memory efficiency for small
operations by serializing the operations to an output stream.
This commit is contained in:
Tim Brooks 2020-10-06 10:49:45 -06:00
parent 15edc39d9b
commit dd4b0d85fe
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
5 changed files with 59 additions and 60 deletions

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -72,6 +71,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.elasticsearch.index.translog.TranslogConfig.EMPTY_TRANSLOG_BUFFER_SIZE;
/** /**
* A Translog is a per index shard component that records all non-committed index operations in a durable manner. * A Translog is a per index shard component that records all non-committed index operations in a durable manner.
* In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}.
@ -116,7 +117,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// the list of translog readers is guaranteed to be in order of translog generation // the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>(); private final List<TranslogReader> readers = new ArrayList<>();
private BigArrays bigArrays; private final BigArrays bigArrays;
protected final ReleasableLock readLock; protected final ReleasableLock readLock;
protected final ReleasableLock writeLock; protected final ReleasableLock writeLock;
private final Path location; private final Path location;
@ -505,7 +506,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
config.getBufferSize(), config.getBufferSize(),
initialMinTranslogGen, initialGlobalCheckpoint, initialMinTranslogGen, initialGlobalCheckpoint,
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy, globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy,
persistedSequenceNumberConsumer); persistedSequenceNumberConsumer,
bigArrays);
} catch (final IOException e) { } catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e); throw new TranslogException(shardId, "failed to create new translog file", e);
} }
@ -521,7 +523,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/ */
public Location add(final Operation operation) throws IOException { public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
boolean successfullySerialized = false;
try { try {
final long start = out.position(); final long start = out.position();
out.skip(Integer.BYTES); out.skip(Integer.BYTES);
@ -531,9 +532,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.seek(start); out.seek(start);
out.writeInt(operationSize); out.writeInt(operationSize);
out.seek(end); out.seek(end);
successfullySerialized = true; final BytesReference bytes = out.bytes();
try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out); try (ReleasableLock ignored = readLock.acquire()) {
ReleasableLock ignored = readLock.acquire()) {
ensureOpen(); ensureOpen();
if (operation.primaryTerm() > current.getPrimaryTerm()) { if (operation.primaryTerm() > current.getPrimaryTerm()) {
assert false : assert false :
@ -551,11 +551,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
closeOnTragicEvent(ex); closeOnTragicEvent(ex);
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex);
} finally { } finally {
if (successfullySerialized == false) {
Releasables.close(out); Releasables.close(out);
} }
} }
}
/** /**
* Tests whether or not the translog generation should be rolled to a new generation. This test * Tests whether or not the translog generation should be rolled to a new generation. This test
@ -1911,7 +1909,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false); IOUtils.fsync(checkpointFile, false);
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, EMPTY_TRANSLOG_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint,
() -> { () -> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
}, () -> { }, () -> {
@ -1921,7 +1919,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
new TragicExceptionHolder(), new TragicExceptionHolder(),
seqNo -> { seqNo -> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
}); }, BigArrays.NON_RECYCLING_INSTANCE);
writer.close(); writer.close();
return uuid; return uuid;
} }

View File

@ -35,6 +35,7 @@ import java.nio.file.Path;
public final class TranslogConfig { public final class TranslogConfig {
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
public static final ByteSizeValue EMPTY_TRANSLOG_BUFFER_SIZE = new ByteSizeValue(10, ByteSizeUnit.BYTES);
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final IndexSettings indexSettings; private final IndexSettings indexSettings;
private final ShardId shardId; private final ShardId shardId;

View File

@ -32,9 +32,10 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.DiskIoBufferPool;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
@ -46,8 +47,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -61,6 +60,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private final ShardId shardId; private final ShardId shardId;
private final FileChannel checkpointChannel; private final FileChannel checkpointChannel;
private final Path checkpointPath; private final Path checkpointPath;
private final BigArrays bigArrays;
// the last checkpoint that was written when the translog was last synced // the last checkpoint that was written when the translog was last synced
private volatile Checkpoint lastSyncedCheckpoint; private volatile Checkpoint lastSyncedCheckpoint;
/* the number of translog operations written to this file */ /* the number of translog operations written to this file */
@ -87,8 +87,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64); private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64);
private final int forceWriteThreshold; private final int forceWriteThreshold;
private final ArrayList<ReleasableBytesReference> bufferedOps = new ArrayList<>(); private ReleasableBytesStreamOutput buffer;
private long bufferedBytes = 0L;
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers; private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
@ -101,8 +100,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
final Path checkpointPath, final Path checkpointPath,
final ByteSizeValue bufferSize, final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
TragicExceptionHolder tragedy, final TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer) final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays)
throws throws
IOException { IOException {
super(initialCheckpoint.generation, channel, path, header); super(initialCheckpoint.generation, channel, path, header);
@ -123,6 +123,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier; this.globalCheckpointSupplier = globalCheckpointSupplier;
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
this.bigArrays = bigArrays;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy; this.tragedy = tragedy;
} }
@ -130,7 +131,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint,
final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer) final long primaryTerm, TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays)
throws IOException { throws IOException {
final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME); final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);
@ -155,7 +157,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
writerGlobalCheckpointSupplier = globalCheckpointSupplier; writerGlobalCheckpointSupplier = globalCheckpointSupplier;
} }
return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize, return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize,
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer); writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer, bigArrays);
} catch (Exception exception) { } catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation
@ -182,15 +184,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
* @return the location the bytes were written to * @return the location the bytes were written to
* @throws IOException if writing to the translog resulted in an I/O exception * @throws IOException if writing to the translog resulted in an I/O exception
*/ */
public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException { public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
final Translog.Location location; final Translog.Location location;
final long bytesBufferedAfterAdd; final long bytesBufferedAfterAdd;
synchronized (this) { synchronized (this) {
ensureOpen(); ensureOpen();
if (buffer == null) {
buffer = new ReleasableBytesStreamOutput(bigArrays);
}
final long offset = totalOffset; final long offset = totalOffset;
totalOffset += data.length(); totalOffset += data.length();
bufferedBytes += data.length(); data.writeTo(buffer);
bufferedOps.add(data.retain());
assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
@ -205,7 +209,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
assert assertNoSeqNumberConflict(seqNo, data); assert assertNoSeqNumberConflict(seqNo, data);
location = new Translog.Location(generation, offset, data.length()); location = new Translog.Location(generation, offset, data.length());
bytesBufferedAfterAdd = bufferedBytes; bytesBufferedAfterAdd = buffer.size();
} }
if (bytesBufferedAfterAdd >= forceWriteThreshold) { if (bytesBufferedAfterAdd >= forceWriteThreshold) {
@ -335,7 +339,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
throw ex; throw ex;
} }
// If we reached this point, all of the buffered ops should have been flushed successfully. // If we reached this point, all of the buffered ops should have been flushed successfully.
assert bufferedOps.size() == 0; assert buffer == null;
assert checkChannelPositionWhileHandlingException(totalOffset); assert checkChannelPositionWhileHandlingException(totalOffset);
assert totalOffset == lastSyncedCheckpoint.offset; assert totalOffset == lastSyncedCheckpoint.offset;
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
@ -372,7 +376,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
} }
// If we reached this point, all of the buffered ops should have been flushed successfully. // If we reached this point, all of the buffered ops should have been flushed successfully.
assert bufferedOps.size() == 0; assert buffer == null;
assert checkChannelPositionWhileHandlingException(totalOffset); assert checkChannelPositionWhileHandlingException(totalOffset);
assert totalOffset == lastSyncedCheckpoint.offset; assert totalOffset == lastSyncedCheckpoint.offset;
return super.newSnapshot(); return super.newSnapshot();
@ -398,7 +402,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// the lock we should check again since if this code is busy we might have fsynced enough already // the lock we should check again since if this code is busy we might have fsynced enough already
final Checkpoint checkpointToSync; final Checkpoint checkpointToSync;
final LongArrayList flushedSequenceNumbers; final LongArrayList flushedSequenceNumbers;
final ArrayDeque<ReleasableBytesReference> toWrite; final ReleasableBytesReference toWrite;
try (ReleasableLock toClose = writeLock.acquire()) { try (ReleasableLock toClose = writeLock.acquire()) {
synchronized (this) { synchronized (this) {
ensureOpen(); ensureOpen();
@ -449,24 +453,23 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} }
} }
private synchronized ArrayDeque<ReleasableBytesReference> pollOpsToWrite() { private synchronized ReleasableBytesReference pollOpsToWrite() {
ensureOpen(); ensureOpen();
final ArrayDeque<ReleasableBytesReference> operationsToWrite = new ArrayDeque<>(bufferedOps.size()); if (this.buffer != null) {
operationsToWrite.addAll(bufferedOps); ReleasableBytesStreamOutput toWrite = this.buffer;
bufferedOps.clear(); this.buffer = null;
bufferedBytes = 0; return new ReleasableBytesReference(toWrite.bytes(), toWrite);
return operationsToWrite; } else {
return ReleasableBytesReference.wrap(BytesArray.EMPTY);
}
} }
private void writeAndReleaseOps(final ArrayDeque<ReleasableBytesReference> operationsToWrite) throws IOException { private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
try { try (ReleasableBytesReference toClose = toWrite) {
assert writeLock.isHeldByCurrentThread(); assert writeLock.isHeldByCurrentThread();
ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer(); ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer();
ReleasableBytesReference operation; BytesRefIterator iterator = toWrite.iterator();
while ((operation = operationsToWrite.pollFirst()) != null) {
try (Releasable toClose = operation) {
BytesRefIterator iterator = operation.iterator();
BytesRef current; BytesRef current;
while ((current = iterator.next()) != null) { while ((current = iterator.next()) != null) {
int currentBytesConsumed = 0; int currentBytesConsumed = 0;
@ -481,12 +484,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} }
} }
} }
}
}
ioBuffer.flip(); ioBuffer.flip();
writeToFile(ioBuffer); writeToFile(ioBuffer);
} finally {
Releasables.close(operationsToWrite);
} }
} }
@ -550,8 +549,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
public final void close() throws IOException { public final void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
synchronized (this) { synchronized (this) {
Releasables.closeWhileHandlingException(bufferedOps); Releasables.closeWhileHandlingException(buffer);
bufferedOps.clear(); buffer = null;
} }
IOUtils.close(checkpointChannel, channel); IOUtils.close(checkpointChannel, channel);
} }

View File

@ -20,12 +20,13 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.ByteArrayDataOutput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -190,7 +191,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
} }
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L,
() -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}); () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE);
writer = Mockito.spy(writer); writer = Mockito.spy(writer);
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();

View File

@ -1267,7 +1267,7 @@ public class TranslogTests extends ESTestCase {
final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1);
final Set<Long> persistedSeqNos = new HashSet<>(); final Set<Long> persistedSeqNos = new HashSet<>();
persistedSeqNoConsumer.set(persistedSeqNos::add); persistedSeqNoConsumer.set(persistedSeqNos::add);
final int numOps = randomIntBetween(8, 128); final int numOps = scaledRandomIntBetween(8, 250000);
final Set<Long> seenSeqNos = new HashSet<>(); final Set<Long> seenSeqNos = new HashSet<>();
boolean opsHaveValidSequenceNumbers = randomBoolean(); boolean opsHaveValidSequenceNumbers = randomBoolean();
for (int i = 0; i < numOps; i++) { for (int i = 0; i < numOps; i++) {