From 075271758e3daff37f0ad16217553067f8eb57aa Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 1 Sep 2020 11:35:16 -0600 Subject: [PATCH] Keep checkpoint file channel open across fsyncs (#61744) Currently we open and close the checkpoint file channel for every fsync. This file channel can be kept open for the lifecycle of a translog writer. This avoids the overhead of opening the file, checking file permissions, and closing the file on every fsync. --- .../org/elasticsearch/common/io/Channels.java | 34 +++++++++++++++- .../index/translog/Checkpoint.java | 29 ++++++++++---- .../index/translog/Translog.java | 6 +-- .../index/translog/TranslogWriter.java | 39 ++++++++++++------- .../elasticsearch/common/ChannelsTests.java | 24 ++++++++++-- .../index/translog/TranslogTests.java | 20 +++++++++- 6 files changed, 122 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/Channels.java b/server/src/main/java/org/elasticsearch/common/io/Channels.java index 1d76be43ca9..f80b4bd9607 100644 --- a/server/src/main/java/org/elasticsearch/common/io/Channels.java +++ b/server/src/main/java/org/elasticsearch/common/io/Channels.java @@ -172,7 +172,6 @@ public final class Channels { writeToChannel(source, 0, source.length, channel); } - /** * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} * @@ -195,6 +194,39 @@ public final class Channels { assert length == 0 : "wrote more then expected bytes (length=" + length + ")"; } + /** + * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided + * position. + * + * @param source byte array to copy from + * @param channel target WritableByteChannel + * @param channelPosition position to write at + */ + public static void writeToChannel(byte[] source, FileChannel channel, long channelPosition) throws IOException { + writeToChannel(source, 0, source.length, channel, channelPosition); + } + + /** + * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided + * position. + * + * @param source byte array to copy from + * @param offset start copying from this offset + * @param length how many bytes to copy + * @param channel target WritableByteChannel + * @param channelPosition position to write at + */ + public static void writeToChannel(byte[] source, int offset, int length, FileChannel channel, long channelPosition) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(source, offset, length); + int written = channel.write(buffer, channelPosition); + length -= written; + while (length > 0) { + written = channel.write(buffer, channelPosition + buffer.position()); + length -= written; + } + assert length == 0 : "wrote more then expected bytes (length=" + length + ")"; + } + /** * Writes a {@link java.nio.ByteBuffer} to a {@link java.nio.channels.WritableByteChannel} * diff --git a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index 7170a5fe586..b0a378c9268 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -198,6 +198,26 @@ final class Checkpoint { } public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException { + byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint); + + // now go and write to the channel, in one go. + try (FileChannel channel = factory.open(checkpointFile, options)) { + Channels.writeToChannel(bytes, channel); + // no need to force metadata, file size stays the same and we did the full fsync + // when we first created the file, so the directory entry doesn't change as well + channel.force(false); + } + } + + public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint) throws IOException { + byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint); + Channels.writeToChannel(bytes, fileChannel, 0); + // no need to force metadata, file size stays the same and we did the full fsync + // when we first created the file, so the directory entry doesn't change as well + fileChannel.force(false); + } + + private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException { final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V3_FILE_SIZE) { @Override public synchronized byte[] toByteArray() { @@ -216,15 +236,8 @@ final class Checkpoint { "get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + V3_FILE_SIZE; assert indexOutput.getFilePointer() < 512 : "checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer(); - - } - // now go and write to the channel, in one go. - try (FileChannel channel = factory.open(checkpointFile, options)) { - Channels.writeToChannel(byteOutputStream.toByteArray(), channel); - // no need to force metadata, file size stays the same and we did the full fsync - // when we first created the file, so the directory entry doesn't change as well - channel.force(false); } + return byteOutputStream.toByteArray(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index f670623b429..8da8fb6b8f6 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -494,9 +494,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint, LongConsumer persistedSequenceNumberConsumer) throws IOException { - final TranslogWriter newFile; + final TranslogWriter newWriter; try { - newFile = TranslogWriter.create( + newWriter = TranslogWriter.create( shardId, translogUUID, fileGeneration, @@ -509,7 +509,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } - return newFile; + return newWriter; } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index e2240977c94..8007d51abb4 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -22,13 +22,13 @@ package org.elasticsearch.index.translog; import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.procedures.LongProcedure; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -49,7 +49,8 @@ import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; - private final ChannelFactory channelFactory; + private final FileChannel checkpointChannel; + private final Path checkpointPath; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -79,11 +80,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final Map> seenSequenceNumbers; private TranslogWriter( - final ChannelFactory channelFactory, final ShardId shardId, final Checkpoint initialCheckpoint, final FileChannel channel, + final FileChannel checkpointChannel, final Path path, + final Path checkpointPath, final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, TragicExceptionHolder tragedy, @@ -95,7 +97,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" + channel.position() + "]"; this.shardId = shardId; - this.channelFactory = channelFactory; + this.checkpointChannel = checkpointChannel; + this.checkpointPath = checkpointPath; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.lastSyncedCheckpoint = initialCheckpoint; @@ -117,13 +120,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer) throws IOException { + final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME); + final FileChannel channel = channelFactory.open(file); + FileChannel checkpointChannel = null; try { + checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); header.write(channel); final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration, initialGlobalCheckpoint, initialMinTranslogGen); - writeCheckpoint(channelFactory, file.getParent(), checkpoint); + writeCheckpoint(checkpointChannel, checkpointFile, checkpoint); final LongSupplier writerGlobalCheckpointSupplier; if (Assertions.ENABLED) { writerGlobalCheckpointSupplier = () -> { @@ -135,13 +142,13 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { } else { writerGlobalCheckpointSupplier = globalCheckpointSupplier; } - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, + return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize, writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation // is an error condition - IOUtils.closeWhileHandlingException(channel); + IOUtils.closeWhileHandlingException(channel, checkpointChannel); throw exception; } } @@ -314,6 +321,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { throw ex; } if (closed.compareAndSet(false, true)) { + try { + checkpointChannel.close(); + } catch (final Exception ex) { + closeWithTragicEvent(ex); + throw ex; + } return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", @@ -374,7 +387,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { // we can continue writing to the buffer etc. try { channel.force(false); - writeCheckpoint(channelFactory, path.getParent(), checkpointToSync); + writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync); } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -414,10 +427,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { } private static void writeCheckpoint( - final ChannelFactory channelFactory, - final Path translogFile, - final Checkpoint checkpoint) throws IOException { - Checkpoint.write(channelFactory, translogFile.resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE); + final FileChannel fileChannel, + final Path checkpointFile, + final Checkpoint checkpoint) throws IOException { + Checkpoint.write(fileChannel, checkpointFile, checkpoint); } /** @@ -438,7 +451,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { @Override public final void close() throws IOException { if (closed.compareAndSet(false, true)) { - channel.close(); + IOUtils.close(checkpointChannel, channel); } } diff --git a/server/src/test/java/org/elasticsearch/common/ChannelsTests.java b/server/src/test/java/org/elasticsearch/common/ChannelsTests.java index 41a6553305d..fd44c76a3a6 100644 --- a/server/src/test/java/org/elasticsearch/common/ChannelsTests.java +++ b/server/src/test/java/org/elasticsearch/common/ChannelsTests.java @@ -63,7 +63,7 @@ public class ChannelsTests extends ESTestCase { } public void testReadWriteThoughArrays() throws Exception { - Channels.writeToChannel(randomBytes, fileChannel); + writeToChannel(randomBytes, fileChannel); byte[] readBytes = Channels.readFromFileChannel(fileChannel, 0, randomBytes.length); assertThat("read bytes didn't match written bytes", randomBytes, Matchers.equalTo(readBytes)); } @@ -72,7 +72,7 @@ public class ChannelsTests extends ESTestCase { public void testPartialReadWriteThroughArrays() throws Exception { int length = randomIntBetween(1, randomBytes.length / 2); int offset = randomIntBetween(0, randomBytes.length - length); - Channels.writeToChannel(randomBytes, offset, length, fileChannel); + writeToChannel(randomBytes, offset, length, fileChannel); int lengthToRead = randomIntBetween(1, length); int offsetToRead = randomIntBetween(0, length - lengthToRead); @@ -87,7 +87,7 @@ public class ChannelsTests extends ESTestCase { public void testBufferReadPastEOFWithException() throws Exception { int bytesToWrite = randomIntBetween(0, randomBytes.length - 1); - Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel); + writeToChannel(randomBytes, 0, bytesToWrite, fileChannel); try { Channels.readFromFileChannel(fileChannel, 0, bytesToWrite + 1 + randomInt(1000)); fail("Expected an EOFException"); @@ -98,7 +98,7 @@ public class ChannelsTests extends ESTestCase { public void testBufferReadPastEOFWithoutException() throws Exception { int bytesToWrite = randomIntBetween(0, randomBytes.length - 1); - Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel); + writeToChannel(randomBytes, 0, bytesToWrite, fileChannel); byte[] bytes = new byte[bytesToWrite + 1 + randomInt(1000)]; int read = Channels.readFromFileChannel(fileChannel, 0, bytes, 0, bytes.length); assertThat(read, Matchers.lessThan(0)); @@ -161,6 +161,22 @@ public class ChannelsTests extends ESTestCase { assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef)); } + private static void writeToChannel(byte[] source, int offset, int length, FileChannel channel) throws IOException { + if (randomBoolean()) { + Channels.writeToChannel(source, offset, length, channel, channel.position()); + } else { + Channels.writeToChannel(source, offset, length, channel); + } + } + + private static void writeToChannel(byte[] source, FileChannel channel) throws IOException { + if (randomBoolean()) { + Channels.writeToChannel(source, channel, channel.position()); + } else { + Channels.writeToChannel(source, channel); + } + } + class MockFileChannel extends FileChannel { FileChannel delegate; diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 26965860f42..05e99fb9f95 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2434,7 +2434,25 @@ public class TranslogTests extends ESTestCase { @Override public int write(ByteBuffer src, long position) throws IOException { - throw new UnsupportedOperationException(); + if (fail.fail()) { + if (partialWrite) { + if (src.hasRemaining()) { + final int pos = src.position(); + final int limit = src.limit(); + src.limit(randomIntBetween(pos, limit)); + super.write(src, position); + src.limit(limit); + src.position(pos); + throw new IOException("__FAKE__ no space left on device"); + } + } + if (throwUnknownException) { + throw new UnknownException(); + } else { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + return super.write(src, position); }