Keep checkpoint file channel open across fsyncs (#61744)

Currently we open and close the checkpoint file channel for every fsync.
This file channel can be kept open for the lifecycle of a translog
writer. This avoids the overhead of opening the file, checking file
permissions, and closing the file on every fsync.
This commit is contained in:
Tim Brooks 2020-09-01 11:35:16 -06:00
parent 8f27e9fa28
commit 075271758e
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
6 changed files with 122 additions and 30 deletions

View File

@ -172,7 +172,6 @@ public final class Channels {
writeToChannel(source, 0, source.length, channel);
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
*
@ -195,6 +194,39 @@ public final class Channels {
assert length == 0 : "wrote more then expected bytes (length=" + length + ")";
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided
* position.
*
* @param source byte array to copy from
* @param channel target WritableByteChannel
* @param channelPosition position to write at
*/
public static void writeToChannel(byte[] source, FileChannel channel, long channelPosition) throws IOException {
writeToChannel(source, 0, source.length, channel, channelPosition);
}
/**
* Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided
* position.
*
* @param source byte array to copy from
* @param offset start copying from this offset
* @param length how many bytes to copy
* @param channel target WritableByteChannel
* @param channelPosition position to write at
*/
public static void writeToChannel(byte[] source, int offset, int length, FileChannel channel, long channelPosition) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(source, offset, length);
int written = channel.write(buffer, channelPosition);
length -= written;
while (length > 0) {
written = channel.write(buffer, channelPosition + buffer.position());
length -= written;
}
assert length == 0 : "wrote more then expected bytes (length=" + length + ")";
}
/**
* Writes a {@link java.nio.ByteBuffer} to a {@link java.nio.channels.WritableByteChannel}
*

View File

@ -198,6 +198,26 @@ final class Checkpoint {
}
public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
// now go and write to the channel, in one go.
try (FileChannel channel = factory.open(checkpointFile, options)) {
Channels.writeToChannel(bytes, channel);
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
channel.force(false);
}
}
public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint) throws IOException {
byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
Channels.writeToChannel(bytes, fileChannel, 0);
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
fileChannel.force(false);
}
private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException {
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V3_FILE_SIZE) {
@Override
public synchronized byte[] toByteArray() {
@ -216,15 +236,8 @@ final class Checkpoint {
"get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + V3_FILE_SIZE;
assert indexOutput.getFilePointer() < 512 :
"checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer();
}
// now go and write to the channel, in one go.
try (FileChannel channel = factory.open(checkpointFile, options)) {
Channels.writeToChannel(byteOutputStream.toByteArray(), channel);
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
channel.force(false);
}
return byteOutputStream.toByteArray();
}
@Override

View File

@ -494,9 +494,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint,
LongConsumer persistedSequenceNumberConsumer) throws IOException {
final TranslogWriter newFile;
final TranslogWriter newWriter;
try {
newFile = TranslogWriter.create(
newWriter = TranslogWriter.create(
shardId,
translogUUID,
fileGeneration,
@ -509,7 +509,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
return newFile;
return newWriter;
}
/**

View File

@ -22,13 +22,13 @@ package org.elasticsearch.index.translog;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.procedures.LongProcedure;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
@ -49,7 +49,8 @@ import java.util.function.LongSupplier;
public class TranslogWriter extends BaseTranslogReader implements Closeable {
private final ShardId shardId;
private final ChannelFactory channelFactory;
private final FileChannel checkpointChannel;
private final Path checkpointPath;
// the last checkpoint that was written when the translog was last synced
private volatile Checkpoint lastSyncedCheckpoint;
/* the number of translog operations written to this file */
@ -79,11 +80,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
private TranslogWriter(
final ChannelFactory channelFactory,
final ShardId shardId,
final Checkpoint initialCheckpoint,
final FileChannel channel,
final FileChannel checkpointChannel,
final Path path,
final Path checkpointPath,
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
TragicExceptionHolder tragedy,
@ -95,7 +97,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
+ channel.position() + "]";
this.shardId = shardId;
this.channelFactory = channelFactory;
this.checkpointChannel = checkpointChannel;
this.checkpointPath = checkpointPath;
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedCheckpoint = initialCheckpoint;
@ -117,13 +120,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer)
throws IOException {
final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);
final FileChannel channel = channelFactory.open(file);
FileChannel checkpointChannel = null;
try {
checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE);
final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
header.write(channel);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration,
initialGlobalCheckpoint, initialMinTranslogGen);
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
writeCheckpoint(checkpointChannel, checkpointFile, checkpoint);
final LongSupplier writerGlobalCheckpointSupplier;
if (Assertions.ENABLED) {
writerGlobalCheckpointSupplier = () -> {
@ -135,13 +142,13 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} else {
writerGlobalCheckpointSupplier = globalCheckpointSupplier;
}
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize,
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation
// is an error condition
IOUtils.closeWhileHandlingException(channel);
IOUtils.closeWhileHandlingException(channel, checkpointChannel);
throw exception;
}
}
@ -314,6 +321,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
throw ex;
}
if (closed.compareAndSet(false, true)) {
try {
checkpointChannel.close();
} catch (final Exception ex) {
closeWithTragicEvent(ex);
throw ex;
}
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
} else {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
@ -374,7 +387,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// we can continue writing to the buffer etc.
try {
channel.force(false);
writeCheckpoint(channelFactory, path.getParent(), checkpointToSync);
writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync);
} catch (final Exception ex) {
closeWithTragicEvent(ex);
throw ex;
@ -414,10 +427,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
}
private static void writeCheckpoint(
final ChannelFactory channelFactory,
final Path translogFile,
final Checkpoint checkpoint) throws IOException {
Checkpoint.write(channelFactory, translogFile.resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE);
final FileChannel fileChannel,
final Path checkpointFile,
final Checkpoint checkpoint) throws IOException {
Checkpoint.write(fileChannel, checkpointFile, checkpoint);
}
/**
@ -438,7 +451,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
@Override
public final void close() throws IOException {
if (closed.compareAndSet(false, true)) {
channel.close();
IOUtils.close(checkpointChannel, channel);
}
}

View File

@ -63,7 +63,7 @@ public class ChannelsTests extends ESTestCase {
}
public void testReadWriteThoughArrays() throws Exception {
Channels.writeToChannel(randomBytes, fileChannel);
writeToChannel(randomBytes, fileChannel);
byte[] readBytes = Channels.readFromFileChannel(fileChannel, 0, randomBytes.length);
assertThat("read bytes didn't match written bytes", randomBytes, Matchers.equalTo(readBytes));
}
@ -72,7 +72,7 @@ public class ChannelsTests extends ESTestCase {
public void testPartialReadWriteThroughArrays() throws Exception {
int length = randomIntBetween(1, randomBytes.length / 2);
int offset = randomIntBetween(0, randomBytes.length - length);
Channels.writeToChannel(randomBytes, offset, length, fileChannel);
writeToChannel(randomBytes, offset, length, fileChannel);
int lengthToRead = randomIntBetween(1, length);
int offsetToRead = randomIntBetween(0, length - lengthToRead);
@ -87,7 +87,7 @@ public class ChannelsTests extends ESTestCase {
public void testBufferReadPastEOFWithException() throws Exception {
int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
try {
Channels.readFromFileChannel(fileChannel, 0, bytesToWrite + 1 + randomInt(1000));
fail("Expected an EOFException");
@ -98,7 +98,7 @@ public class ChannelsTests extends ESTestCase {
public void testBufferReadPastEOFWithoutException() throws Exception {
int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
byte[] bytes = new byte[bytesToWrite + 1 + randomInt(1000)];
int read = Channels.readFromFileChannel(fileChannel, 0, bytes, 0, bytes.length);
assertThat(read, Matchers.lessThan(0));
@ -161,6 +161,22 @@ public class ChannelsTests extends ESTestCase {
assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
}
private static void writeToChannel(byte[] source, int offset, int length, FileChannel channel) throws IOException {
if (randomBoolean()) {
Channels.writeToChannel(source, offset, length, channel, channel.position());
} else {
Channels.writeToChannel(source, offset, length, channel);
}
}
private static void writeToChannel(byte[] source, FileChannel channel) throws IOException {
if (randomBoolean()) {
Channels.writeToChannel(source, channel, channel.position());
} else {
Channels.writeToChannel(source, channel);
}
}
class MockFileChannel extends FileChannel {
FileChannel delegate;

View File

@ -2434,7 +2434,25 @@ public class TranslogTests extends ESTestCase {
@Override
public int write(ByteBuffer src, long position) throws IOException {
throw new UnsupportedOperationException();
if (fail.fail()) {
if (partialWrite) {
if (src.hasRemaining()) {
final int pos = src.position();
final int limit = src.limit();
src.limit(randomIntBetween(pos, limit));
super.write(src, position);
src.limit(limit);
src.position(pos);
throw new IOException("__FAKE__ no space left on device");
}
}
if (throwUnknownException) {
throw new UnknownException();
} else {
throw new MockDirectoryWrapper.FakeIOException();
}
}
return super.write(src, position);
}