Do not block Translog add on file write (#63374)
Currently a TranslogWriter add operation is synchronized. This operation adds the bytes to the file output stream buffer and issues a write system call if the buffer is filled. This happens every 8KB which means that we routinely block other add calls on system writes. This commit modifies the add operation to simply place the operation in an array list. The array list if flushed when the sync call occurs or when 1MB is buffered.
This commit is contained in:
parent
f2ba62b894
commit
64bbbaeef1
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io;
|
||||
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class DiskIoBufferPool {
|
||||
|
||||
public static final int BUFFER_SIZE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue(
|
||||
System.getProperty("es.disk_io.direct.buffer.size", "64KB"), "es.disk_io.direct.buffer.size").getBytes());
|
||||
public static final int HEAP_BUFFER_SIZE = 8 * 1024;
|
||||
|
||||
private static final ThreadLocal<ByteBuffer> ioBufferPool = ThreadLocal.withInitial(() -> {
|
||||
if (isWriteOrFlushThread()) {
|
||||
return ByteBuffer.allocateDirect(BUFFER_SIZE);
|
||||
} else {
|
||||
return ByteBuffer.allocate(HEAP_BUFFER_SIZE);
|
||||
}
|
||||
});
|
||||
|
||||
public static ByteBuffer getIoBuffer() {
|
||||
ByteBuffer ioBuffer = ioBufferPool.get();
|
||||
ioBuffer.clear();
|
||||
return ioBuffer;
|
||||
}
|
||||
|
||||
private static boolean isWriteOrFlushThread() {
|
||||
String threadName = Thread.currentThread().getName();
|
||||
for (String s : Arrays.asList(
|
||||
"[" + ThreadPool.Names.WRITE + "]",
|
||||
"[" + ThreadPool.Names.FLUSH + "]",
|
||||
"[" + ThreadPool.Names.SYSTEM_WRITE + "]")) {
|
||||
if (threadName.contains(s)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -58,6 +58,19 @@ public class ReleasableLock implements Releasable {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try acquiring lock, returning null if unable.
|
||||
*/
|
||||
public ReleasableLock tryAcquire() {
|
||||
boolean locked = lock.tryLock();
|
||||
if (locked) {
|
||||
assert addCurrentThread();
|
||||
return this;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try acquiring lock, returning null if unable to acquire lock within timeout.
|
||||
*/
|
||||
|
|
|
@ -28,13 +28,13 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -521,6 +521,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
*/
|
||||
public Location add(final Operation operation) throws IOException {
|
||||
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
|
||||
boolean successfullySerialized = false;
|
||||
try {
|
||||
final long start = out.position();
|
||||
out.skip(Integer.BYTES);
|
||||
|
@ -530,8 +531,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
out.seek(start);
|
||||
out.writeInt(operationSize);
|
||||
out.seek(end);
|
||||
final BytesReference bytes = out.bytes();
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
successfullySerialized = true;
|
||||
try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out);
|
||||
ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (operation.primaryTerm() > current.getPrimaryTerm()) {
|
||||
assert false :
|
||||
|
@ -549,7 +551,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
closeOnTragicEvent(ex);
|
||||
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex);
|
||||
} finally {
|
||||
Releasables.close(out);
|
||||
if (successfullySerialized == false) {
|
||||
Releasables.close(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1907,7 +1911,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||
IOUtils.fsync(checkpointFile, false);
|
||||
final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory,
|
||||
new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint,
|
||||
TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint,
|
||||
() -> {
|
||||
throw new UnsupportedOperationException();
|
||||
}, () -> {
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.nio.file.Path;
|
|||
*/
|
||||
public final class TranslogConfig {
|
||||
|
||||
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
|
||||
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
|
||||
private final BigArrays bigArrays;
|
||||
private final IndexSettings indexSettings;
|
||||
private final ShardId shardId;
|
||||
|
|
|
@ -22,32 +22,42 @@ package org.elasticsearch.index.translog;
|
|||
import com.carrotsearch.hppc.LongArrayList;
|
||||
import com.carrotsearch.hppc.procedures.LongProcedure;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.io.DiskIoBufferPool;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
||||
|
||||
private final ShardId shardId;
|
||||
private final FileChannel checkpointChannel;
|
||||
private final Path checkpointPath;
|
||||
|
@ -57,8 +67,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
private volatile int operationCounter;
|
||||
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
|
||||
private final TragicExceptionHolder tragedy;
|
||||
/* A buffered outputstream what writes to the writers channel */
|
||||
private final OutputStream outputStream;
|
||||
/* the total offset of this file including the bytes written to the file as well as into the buffer */
|
||||
private volatile long totalOffset;
|
||||
|
||||
|
@ -72,10 +80,15 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
private final LongConsumer persistedSequenceNumberConsumer;
|
||||
|
||||
protected final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
// lock order synchronized(syncLock) -> synchronized(this)
|
||||
// lock order try(Releasable lock = writeLock.acquire()) -> synchronized(this)
|
||||
private final ReleasableLock writeLock = new ReleasableLock(new ReentrantLock());
|
||||
// lock order synchronized(syncLock) -> try(Releasable lock = writeLock.acquire()) -> synchronized(this)
|
||||
private final Object syncLock = new Object();
|
||||
|
||||
private LongArrayList nonFsyncedSequenceNumbers;
|
||||
private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64);
|
||||
private final int forceWriteThreshold;
|
||||
private final ArrayList<ReleasableBytesReference> bufferedOps = new ArrayList<>();
|
||||
private long bufferedBytes = 0L;
|
||||
|
||||
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
|
||||
|
||||
|
@ -96,11 +109,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
assert initialCheckpoint.offset == channel.position() :
|
||||
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
|
||||
+ channel.position() + "]";
|
||||
this.forceWriteThreshold = Math.toIntExact(bufferSize.getBytes());
|
||||
this.shardId = shardId;
|
||||
this.checkpointChannel = checkpointChannel;
|
||||
this.checkpointPath = checkpointPath;
|
||||
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
|
||||
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
|
||||
this.lastSyncedCheckpoint = initialCheckpoint;
|
||||
this.totalOffset = initialCheckpoint.offset;
|
||||
assert initialCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo;
|
||||
|
@ -109,7 +122,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
this.maxSeqNo = initialCheckpoint.maxSeqNo;
|
||||
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
|
||||
this.globalCheckpointSupplier = globalCheckpointSupplier;
|
||||
this.nonFsyncedSequenceNumbers = new LongArrayList(64);
|
||||
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
|
||||
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
|
||||
this.tragedy = tragedy;
|
||||
|
@ -162,10 +174,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* add the given bytes to the translog and return the location they were written at
|
||||
*/
|
||||
|
||||
/**
|
||||
* Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to.
|
||||
*
|
||||
|
@ -174,34 +182,37 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
* @return the location the bytes were written to
|
||||
* @throws IOException if writing to the translog resulted in an I/O exception
|
||||
*/
|
||||
public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
|
||||
ensureOpen();
|
||||
final long offset = totalOffset;
|
||||
try {
|
||||
data.writeTo(outputStream);
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
}
|
||||
totalOffset += data.length();
|
||||
public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException {
|
||||
final Translog.Location location;
|
||||
final long bytesBufferedAfterAdd;
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
final long offset = totalOffset;
|
||||
totalOffset += data.length();
|
||||
bufferedBytes += data.length();
|
||||
bufferedOps.add(data.retain());
|
||||
|
||||
if (minSeqNo == SequenceNumbers.NO_OPS_PERFORMED) {
|
||||
assert operationCounter == 0;
|
||||
}
|
||||
if (maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED) {
|
||||
assert operationCounter == 0;
|
||||
assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
|
||||
assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
|
||||
|
||||
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
|
||||
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
|
||||
|
||||
nonFsyncedSequenceNumbers.add(seqNo);
|
||||
|
||||
operationCounter++;
|
||||
|
||||
assert assertNoSeqNumberConflict(seqNo, data);
|
||||
|
||||
location = new Translog.Location(generation, offset, data.length());
|
||||
bytesBufferedAfterAdd = bufferedBytes;
|
||||
}
|
||||
|
||||
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
|
||||
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
|
||||
if (bytesBufferedAfterAdd >= forceWriteThreshold) {
|
||||
writeBufferedOps(Long.MAX_VALUE, bytesBufferedAfterAdd >= forceWriteThreshold * 4);
|
||||
}
|
||||
|
||||
nonFsyncedSequenceNumbers.add(seqNo);
|
||||
|
||||
operationCounter++;
|
||||
|
||||
assert assertNoSeqNumberConflict(seqNo, data);
|
||||
|
||||
return new Translog.Location(generation, offset, data.length());
|
||||
return location;
|
||||
}
|
||||
|
||||
private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException {
|
||||
|
@ -211,9 +222,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
|
||||
if (previous.v1().equals(data) == false) {
|
||||
Translog.Operation newOp = Translog.readOperation(
|
||||
new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
|
||||
new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
|
||||
Translog.Operation prvOp = Translog.readOperation(
|
||||
new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
|
||||
new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
|
||||
// TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp.
|
||||
final boolean sameOp;
|
||||
if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) {
|
||||
|
@ -250,7 +261,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
final Translog.Operation op;
|
||||
try {
|
||||
op = Translog.readOperation(
|
||||
new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion"));
|
||||
new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion"));
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
@ -309,28 +320,36 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
public TranslogReader closeIntoReader() throws IOException {
|
||||
// make sure to acquire the sync lock first, to prevent dead locks with threads calling
|
||||
// syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
|
||||
// After the sync lock we acquire the write lock to avoid deadlocks with threads writing where
|
||||
// the write lock is acquired first followed by synchronize(this).
|
||||
//
|
||||
// Note: While this is not strictly needed as this method is called while blocking all ops on the translog,
|
||||
// we do this to for correctness and preventing future issues.
|
||||
synchronized (syncLock) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
sync(); // sync before we close..
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
}
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try (ReleasableLock toClose = writeLock.acquire()) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
checkpointChannel.close();
|
||||
sync(); // sync before we close..
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
}
|
||||
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
|
||||
} else {
|
||||
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
|
||||
// If we reached this point, all of the buffered ops should have been flushed successfully.
|
||||
assert bufferedOps.size() == 0;
|
||||
assert checkChannelPositionWhileHandlingException(totalOffset);
|
||||
assert totalOffset == lastSyncedCheckpoint.offset;
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try {
|
||||
checkpointChannel.close();
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
}
|
||||
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
|
||||
} else {
|
||||
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
|
||||
tragedy.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -341,15 +360,23 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
public TranslogSnapshot newSnapshot() {
|
||||
// make sure to acquire the sync lock first, to prevent dead locks with threads calling
|
||||
// syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
|
||||
// After the sync lock we acquire the write lock to avoid deadlocks with threads writing where
|
||||
// the write lock is acquired first followed by synchronize(this).
|
||||
synchronized (syncLock) {
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
try {
|
||||
sync();
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
|
||||
try (ReleasableLock toClose = writeLock.acquire()) {
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
try {
|
||||
sync();
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
|
||||
}
|
||||
// If we reached this point, all of the buffered ops should have been flushed successfully.
|
||||
assert bufferedOps.size() == 0;
|
||||
assert checkChannelPositionWhileHandlingException(totalOffset);
|
||||
assert totalOffset == lastSyncedCheckpoint.offset;
|
||||
return super.newSnapshot();
|
||||
}
|
||||
return super.newSnapshot();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -371,13 +398,19 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
// the lock we should check again since if this code is busy we might have fsynced enough already
|
||||
final Checkpoint checkpointToSync;
|
||||
final LongArrayList flushedSequenceNumbers;
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
try {
|
||||
outputStream.flush();
|
||||
final ArrayDeque<ReleasableBytesReference> toWrite;
|
||||
try (ReleasableLock toClose = writeLock.acquire()) {
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
checkpointToSync = getCheckpoint();
|
||||
toWrite = pollOpsToWrite();
|
||||
flushedSequenceNumbers = nonFsyncedSequenceNumbers;
|
||||
nonFsyncedSequenceNumbers = new LongArrayList(64);
|
||||
}
|
||||
|
||||
try {
|
||||
// Write ops will release operations.
|
||||
writeAndReleaseOps(toWrite);
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
|
@ -403,19 +436,76 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
return false;
|
||||
}
|
||||
|
||||
private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException {
|
||||
try (ReleasableLock locked = blockOnExistingWriter ? writeLock.acquire() : writeLock.tryAcquire()) {
|
||||
try {
|
||||
if (locked != null && offset > getWrittenOffset()) {
|
||||
writeAndReleaseOps(pollOpsToWrite());
|
||||
}
|
||||
} catch (Exception e){
|
||||
closeWithTragicEvent(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized ArrayDeque<ReleasableBytesReference> pollOpsToWrite() {
|
||||
ensureOpen();
|
||||
final ArrayDeque<ReleasableBytesReference> operationsToWrite = new ArrayDeque<>(bufferedOps.size());
|
||||
operationsToWrite.addAll(bufferedOps);
|
||||
bufferedOps.clear();
|
||||
bufferedBytes = 0;
|
||||
return operationsToWrite;
|
||||
}
|
||||
|
||||
private void writeAndReleaseOps(final ArrayDeque<ReleasableBytesReference> operationsToWrite) throws IOException {
|
||||
try {
|
||||
assert writeLock.isHeldByCurrentThread();
|
||||
ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer();
|
||||
|
||||
ReleasableBytesReference operation;
|
||||
while ((operation = operationsToWrite.pollFirst()) != null) {
|
||||
try (Releasable toClose = operation) {
|
||||
BytesRefIterator iterator = operation.iterator();
|
||||
BytesRef current;
|
||||
while ((current = iterator.next()) != null) {
|
||||
int currentBytesConsumed = 0;
|
||||
while (currentBytesConsumed != current.length) {
|
||||
int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
|
||||
ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
|
||||
currentBytesConsumed += nBytesToWrite;
|
||||
if (ioBuffer.hasRemaining() == false) {
|
||||
ioBuffer.flip();
|
||||
writeToFile(ioBuffer);
|
||||
ioBuffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ioBuffer.flip();
|
||||
writeToFile(ioBuffer);
|
||||
} finally {
|
||||
Releasables.close(operationsToWrite);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Channel#write")
|
||||
private void writeToFile(ByteBuffer ioBuffer) throws IOException {
|
||||
while (ioBuffer.remaining() > 0) {
|
||||
channel.write(ioBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
||||
try {
|
||||
if (position + targetBuffer.remaining() > getWrittenOffset()) {
|
||||
synchronized (this) {
|
||||
// we only flush here if it's really really needed - try to minimize the impact of the read operation
|
||||
// in some cases ie. a tragic event we might still be able to read the relevant value
|
||||
// which is not really important in production but some test can make most strict assumptions
|
||||
// if we don't fail in this call unless absolutely necessary.
|
||||
if (position + targetBuffer.remaining() > getWrittenOffset()) {
|
||||
outputStream.flush();
|
||||
}
|
||||
}
|
||||
// we only flush here if it's really really needed - try to minimize the impact of the read operation
|
||||
// in some cases ie. a tragic event we might still be able to read the relevant value
|
||||
// which is not really important in production but some test can make most strict assumptions
|
||||
// if we don't fail in this call unless absolutely necessary.
|
||||
writeBufferedOps(position + targetBuffer.remaining(), true);
|
||||
}
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
|
@ -448,9 +538,21 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean checkChannelPositionWhileHandlingException(long expectedOffset) {
|
||||
try {
|
||||
return expectedOffset == channel.position();
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void close() throws IOException {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
synchronized (this) {
|
||||
Releasables.closeWhileHandlingException(bufferedOps);
|
||||
bufferedOps.clear();
|
||||
}
|
||||
IOUtils.close(checkpointChannel, channel);
|
||||
}
|
||||
}
|
||||
|
@ -458,33 +560,4 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
protected final boolean isClosed() {
|
||||
return closed.get();
|
||||
}
|
||||
|
||||
|
||||
private final class BufferedChannelOutputStream extends BufferedOutputStream {
|
||||
|
||||
BufferedChannelOutputStream(OutputStream out, int size) throws IOException {
|
||||
super(out, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void flush() throws IOException {
|
||||
if (count > 0) {
|
||||
try {
|
||||
ensureOpen();
|
||||
super.flush();
|
||||
} catch (final Exception ex) {
|
||||
closeWithTragicEvent(ex);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// the stream is intentionally not closed because
|
||||
// closing it will close the FileChannel
|
||||
throw new IllegalStateException("never close this stream");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.store.ByteArrayDataOutput;
|
|||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -199,7 +200,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
for (int ops = randomIntBetween(0, 20); ops > 0; ops--) {
|
||||
out.reset(bytes);
|
||||
out.writeInt(ops);
|
||||
writer.add(new BytesArray(bytes), ops);
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops);
|
||||
}
|
||||
}
|
||||
return new Tuple<>(readers, writer);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -129,6 +130,7 @@ import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
|
|||
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
|
@ -246,12 +248,10 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private TranslogConfig getTranslogConfig(final Path path, final Settings settings) {
|
||||
final ByteSizeValue bufferSize;
|
||||
if (randomBoolean()) {
|
||||
bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE;
|
||||
} else {
|
||||
bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
|
||||
}
|
||||
final ByteSizeValue bufferSize = randomFrom(
|
||||
TranslogConfig.DEFAULT_BUFFER_SIZE,
|
||||
new ByteSizeValue(8, ByteSizeUnit.KB),
|
||||
new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES));
|
||||
|
||||
final IndexSettings indexSettings =
|
||||
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
|
||||
|
@ -1268,12 +1268,11 @@ public class TranslogTests extends ESTestCase {
|
|||
final Set<Long> persistedSeqNos = new HashSet<>();
|
||||
persistedSeqNoConsumer.set(persistedSeqNos::add);
|
||||
final int numOps = randomIntBetween(8, 128);
|
||||
byte[] bytes = new byte[4];
|
||||
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
|
||||
final Set<Long> seenSeqNos = new HashSet<>();
|
||||
boolean opsHaveValidSequenceNumbers = randomBoolean();
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
out.reset(bytes);
|
||||
byte[] bytes = new byte[4];
|
||||
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
|
||||
out.writeInt(i);
|
||||
long seqNo;
|
||||
do {
|
||||
|
@ -1283,7 +1282,7 @@ public class TranslogTests extends ESTestCase {
|
|||
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
seenSeqNos.add(seqNo);
|
||||
}
|
||||
writer.add(new BytesArray(bytes), seqNo);
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo);
|
||||
}
|
||||
assertThat(persistedSeqNos, empty());
|
||||
writer.sync();
|
||||
|
@ -1304,9 +1303,10 @@ public class TranslogTests extends ESTestCase {
|
|||
assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo));
|
||||
assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo));
|
||||
|
||||
out.reset(bytes);
|
||||
byte[] bytes = new byte[4];
|
||||
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
|
||||
out.writeInt(2048);
|
||||
writer.add(new BytesArray(bytes), randomNonNegativeLong());
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
|
||||
|
||||
if (reader instanceof TranslogReader) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4);
|
||||
|
@ -1329,15 +1329,194 @@ public class TranslogTests extends ESTestCase {
|
|||
IOUtils.close(writer);
|
||||
}
|
||||
|
||||
public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
|
||||
Path tempDir = createTempDir();
|
||||
final TranslogConfig temp = getTranslogConfig(tempDir);
|
||||
final TranslogConfig config = new TranslogConfig(temp.getShardId(), temp.getTranslogPath(), temp.getIndexSettings(),
|
||||
temp.getBigArrays(), new ByteSizeValue(1, ByteSizeUnit.KB));
|
||||
|
||||
final Set<Long> persistedSeqNos = new HashSet<>();
|
||||
final AtomicInteger writeCalls = new AtomicInteger();
|
||||
|
||||
final ChannelFactory channelFactory = (file, openOption) -> {
|
||||
FileChannel delegate = FileChannel.open(file, openOption);
|
||||
boolean success = false;
|
||||
try {
|
||||
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
|
||||
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");
|
||||
|
||||
final FileChannel channel;
|
||||
if (isCkpFile) {
|
||||
channel = delegate;
|
||||
} else {
|
||||
channel = new FilterFileChannel(delegate) {
|
||||
|
||||
@Override
|
||||
public int write(ByteBuffer src) throws IOException {
|
||||
writeCalls.incrementAndGet();
|
||||
return super.write(src);
|
||||
}
|
||||
};
|
||||
}
|
||||
success = true;
|
||||
return channel;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(delegate);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
String translogUUID = Translog.createEmptyTranslog(
|
||||
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
|
||||
|
||||
try (Translog translog = new Translog(config, translogUUID, new TranslogDeletionPolicy(-1, -1, 0),
|
||||
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, persistedSeqNos::add) {
|
||||
@Override
|
||||
ChannelFactory getChannelFactory() {
|
||||
return channelFactory;
|
||||
}
|
||||
}) {
|
||||
TranslogWriter writer = translog.getCurrent();
|
||||
int initialWriteCalls = writeCalls.get();
|
||||
byte[] bytes = new byte[256];
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3);
|
||||
assertThat(persistedSeqNos, empty());
|
||||
assertEquals(initialWriteCalls, writeCalls.get());
|
||||
|
||||
if (randomBoolean()) {
|
||||
// This will fill the buffer and force a flush
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
|
||||
assertThat(persistedSeqNos, empty());
|
||||
assertThat(writeCalls.get(), greaterThan(initialWriteCalls));
|
||||
} else {
|
||||
// Will flush on read
|
||||
writer.readBytes(ByteBuffer.allocate(256), 0);
|
||||
assertThat(persistedSeqNos, empty());
|
||||
assertThat(writeCalls.get(), greaterThan(initialWriteCalls));
|
||||
|
||||
// Add after we the read flushed the buffer
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4);
|
||||
}
|
||||
|
||||
writer.sync();
|
||||
|
||||
// Sequence numbers are marked as persisted after sync
|
||||
assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L));
|
||||
}
|
||||
}
|
||||
|
||||
public void testTranslogWriterDoesNotBlockAddsOnWrite() throws IOException, InterruptedException {
|
||||
Path tempDir = createTempDir();
|
||||
final TranslogConfig config = getTranslogConfig(tempDir);
|
||||
final AtomicBoolean startBlocking = new AtomicBoolean(false);
|
||||
final CountDownLatch writeStarted = new CountDownLatch(1);
|
||||
final CountDownLatch blocker = new CountDownLatch(1);
|
||||
final Set<Long> persistedSeqNos = new HashSet<>();
|
||||
|
||||
final ChannelFactory channelFactory = (file, openOption) -> {
|
||||
FileChannel delegate = FileChannel.open(file, openOption);
|
||||
boolean success = false;
|
||||
try {
|
||||
// don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
|
||||
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");
|
||||
|
||||
final FileChannel channel;
|
||||
if (isCkpFile) {
|
||||
channel = delegate;
|
||||
} else {
|
||||
channel = new FilterFileChannel(delegate) {
|
||||
|
||||
@Override
|
||||
public int write(ByteBuffer src) throws IOException {
|
||||
if (startBlocking.get()) {
|
||||
if (writeStarted.getCount() > 0) {
|
||||
writeStarted.countDown();
|
||||
}
|
||||
try {
|
||||
blocker.await();
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
return super.write(src);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void force(boolean metaData) throws IOException {
|
||||
if (startBlocking.get()) {
|
||||
if (writeStarted.getCount() > 0) {
|
||||
writeStarted.countDown();
|
||||
}
|
||||
try {
|
||||
blocker.await();
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
super.force(metaData);
|
||||
}
|
||||
};
|
||||
}
|
||||
success = true;
|
||||
return channel;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(delegate);
|
||||
}
|
||||
}
|
||||
};
|
||||
String translogUUID = Translog.createEmptyTranslog(
|
||||
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
|
||||
|
||||
try (Translog translog = new Translog(config, translogUUID, new TranslogDeletionPolicy(-1, -1, 0),
|
||||
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, persistedSeqNos::add) {
|
||||
@Override
|
||||
ChannelFactory getChannelFactory() {
|
||||
return channelFactory;
|
||||
}
|
||||
}) {
|
||||
TranslogWriter writer = translog.getCurrent();
|
||||
|
||||
byte[] bytes = new byte[4];
|
||||
ByteArrayDataOutput out = new ByteArrayDataOutput(new byte[4]);
|
||||
out.writeInt(1);
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
|
||||
assertThat(persistedSeqNos, empty());
|
||||
startBlocking.set(true);
|
||||
Thread thread = new Thread(() -> {
|
||||
try {
|
||||
writer.sync();
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
writeStarted.await();
|
||||
|
||||
// Add will not block even though we are currently writing/syncing
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
|
||||
|
||||
blocker.countDown();
|
||||
// Sync against so that both operations are written
|
||||
writer.sync();
|
||||
|
||||
assertThat(persistedSeqNos, contains(1L, 2L));
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
public void testCloseIntoReader() throws IOException {
|
||||
try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) {
|
||||
final int numOps = randomIntBetween(8, 128);
|
||||
final byte[] bytes = new byte[4];
|
||||
final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
final byte[] bytes = new byte[4];
|
||||
final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
|
||||
out.reset(bytes);
|
||||
out.writeInt(i);
|
||||
writer.add(new BytesArray(bytes), randomNonNegativeLong());
|
||||
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
|
||||
}
|
||||
writer.sync();
|
||||
final Checkpoint writerCheckpoint = writer.getCheckpoint();
|
||||
|
@ -2482,7 +2661,11 @@ public class TranslogTests extends ESTestCase {
|
|||
@Override
|
||||
public void force(boolean metadata) throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
if (throwUnknownException) {
|
||||
throw new UnknownException();
|
||||
} else {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
}
|
||||
super.force(metadata);
|
||||
}
|
||||
|
@ -2490,7 +2673,11 @@ public class TranslogTests extends ESTestCase {
|
|||
@Override
|
||||
public long position() throws IOException {
|
||||
if (fail.fail()) {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
if (throwUnknownException) {
|
||||
throw new UnknownException();
|
||||
} else {
|
||||
throw new MockDirectoryWrapper.FakeIOException();
|
||||
}
|
||||
}
|
||||
return super.position();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue