Introduce sequence-number-aware translog

Today, the relationship between Lucene and the translog is rather
simple: every document not in Lucene is guaranteed to be in the
translog. We need a stronger guarantee from the translog though, namely
that it can replay all operations after a certain sequence number. For
this to be possible, the translog has to made sequence-number aware. As
a first step, we introduce the min and max sequence numbers into the
translog so that each generation knows the possible range of operations
contained in the generation. This will enable future work to keep around
all generations containing operations after a certain sequence number
(e.g., the global checkpoint).

Relates #22822
This commit is contained in:
Jason Tedor 2017-02-20 15:05:24 -05:00 committed by GitHub
parent 15f5810774
commit 4c2bd5feab
11 changed files with 359 additions and 152 deletions

View File

@ -57,4 +57,52 @@ public class SequenceNumbers {
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}
/**
* Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the
* current minimum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param minSeqNo the current minimum sequence number
* @param seqNo the specified sequence number
* @return the new minimum sequence number
*/
public static long min(final long minSeqNo, final long seqNo) {
if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
return seqNo;
} else if (minSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
return seqNo;
} else {
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence number must be assigned");
}
return Math.min(minSeqNo, seqNo);
}
}
/**
* Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the
* current maximum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param maxSeqNo the current maximum sequence number
* @param seqNo the specified sequence number
* @return the new maximum sequence number
*/
public static long max(final long maxSeqNo, final long seqNo) {
if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
return seqNo;
} else if (maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
return seqNo;
} else {
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence number must be assigned");
}
return Math.max(maxSeqNo, seqNo);
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import java.io.IOException;
@ -52,7 +51,9 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
public abstract long sizeInBytes();
public abstract int totalOperations();
public abstract int totalOperations();
abstract Checkpoint getCheckpoint();
public final long getFirstOperationOffset() {
return firstOperationOffset;
@ -76,7 +77,7 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
}
public Translog.Snapshot newSnapshot() {
return new TranslogSnapshot(generation, channel, path, firstOperationOffset, sizeInBytes(), totalOperations());
return new TranslogSnapshot(this, sizeInBytes());
}
/**

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.apache.lucene.codecs.CodecUtil;
@ -35,11 +36,13 @@ import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
class Checkpoint {
final class Checkpoint {
final long offset;
final int numOps;
final long generation;
final long minSeqNo;
final long maxSeqNo;
final long globalCheckpoint;
private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
@ -52,6 +55,8 @@ class Checkpoint {
+ Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES // generation
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ CodecUtil.footerLength();
@ -62,14 +67,23 @@ class Checkpoint {
+ Long.BYTES // generation
+ CodecUtil.footerLength();
static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES; // generation
Checkpoint(long offset, int numOps, long generation, long globalCheckpoint) {
/**
* Create a new translog checkpoint.
*
* @param offset the current offset in the translog
* @param numOps the current number of operations in the translog
* @param generation the current translog generation
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
*/
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) {
assert minSeqNo <= maxSeqNo;
this.offset = offset;
this.numOps = numOps;
this.generation = generation;
this.minSeqNo = minSeqNo;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
}
@ -77,21 +91,27 @@ class Checkpoint {
out.writeLong(offset);
out.writeInt(numOps);
out.writeLong(generation);
out.writeLong(minSeqNo);
out.writeLong(maxSeqNo);
out.writeLong(globalCheckpoint);
}
static Checkpoint readChecksummedV2(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong());
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint);
}
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
}
// reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readChecksummedV1(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
}
// reads checkpoint from ES < 5.0.0
static Checkpoint readNonChecksummed(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint);
}
@Override
@ -99,7 +119,9 @@ class Checkpoint {
return "Checkpoint{" +
"offset=" + offset +
", numOps=" + numOps +
", translogFileGeneration=" + generation +
", generation=" + generation +
", minSeqNo=" + minSeqNo +
", maxSeqNo=" + maxSeqNo +
", globalCheckpoint=" + globalCheckpoint +
'}';
}
@ -107,21 +129,16 @@ class Checkpoint {
public static Checkpoint read(Path path) throws IOException {
try (Directory dir = new SimpleFSDirectory(path.getParent())) {
try (IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) {
// OLD unchecksummed file that was written < ES 5.0.0
return Checkpoint.readNonChecksummed(indexInput);
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput);
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV5_0_0(indexInput);
} else {
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput);
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE;
return Checkpoint.readChecksummedV1(indexInput);
} else {
assert fileVersion == CURRENT_VERSION;
assert indexInput.length() == FILE_SIZE;
return Checkpoint.readChecksummedV2(indexInput);
}
assert fileVersion == CURRENT_VERSION : fileVersion;
assert indexInput.length() == FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_0_0(indexInput);
}
}
}
@ -159,23 +176,17 @@ class Checkpoint {
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
if (offset != that.offset) {
return false;
}
if (numOps != that.numOps) {
return false;
}
return generation == that.generation;
if (offset != that.offset) return false;
if (numOps != that.numOps) return false;
if (generation != that.generation) return false;
if (minSeqNo != that.minSeqNo) return false;
if (maxSeqNo != that.maxSeqNo) return false;
return globalCheckpoint == that.globalCheckpoint;
}
@Override
@ -183,6 +194,10 @@ class Checkpoint {
int result = Long.hashCode(offset);
result = 31 * result + numOps;
result = 31 * result + Long.hashCode(generation);
result = 31 * result + Long.hashCode(minSeqNo);
result = 31 * result + Long.hashCode(maxSeqNo);
result = 31 * result + Long.hashCode(globalCheckpoint);
return result;
}
}

View File

@ -198,7 +198,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
logger.debug("wipe translog location - creating new translog");
Files.createDirectories(location);
final long generation = 1;
Checkpoint checkpoint = new Checkpoint(0, 0, generation, globalCheckpointSupplier.getAsLong());
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong());
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
@ -400,13 +400,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
* Adds a delete / index operations to the transaction log.
* Adds an operation to the transaction log.
*
* @see org.elasticsearch.index.translog.Translog.Operation
* @see Index
* @see org.elasticsearch.index.translog.Translog.Delete
* @param operation the operation to add
* @return the location of the operation in the translog
* @throws IOException if adding the operation to the translog resulted in an I/O exception
*/
public Location add(Operation operation) throws IOException {
public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
@ -419,22 +419,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.writeInt(operationSize);
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
return location;
return current.add(bytes, operation.seqNo());
}
} catch (AlreadyClosedException | IOException ex) {
} catch (final AlreadyClosedException | IOException ex) {
try {
closeOnTragicEvent(ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (Exception e) {
} catch (final Exception e) {
try {
closeOnTragicEvent(e);
} catch (Exception inner) {
} catch (final Exception inner) {
e.addSuppressed(inner);
}
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
@ -1222,6 +1221,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public enum Durability {
/**
* Async durability - translogs are synced based on a time interval.
*/
@ -1229,7 +1229,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* Request durability - translogs are synced for each high level request (bulk, index, delete)
*/
REQUEST;
REQUEST
}

View File

@ -41,28 +41,42 @@ import java.util.concurrent.atomic.AtomicBoolean;
* an immutable translog filereader
*/
public class TranslogReader extends BaseTranslogReader implements Closeable {
private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f;
private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00;
private final int totalOperations;
protected final long length;
private final int totalOperations;
private final Checkpoint checkpoint;
protected final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Create a reader of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
* Create a translog writer against the specified translog file channel.
*
* @param checkpoint the translog checkpoint
* @param channel the translog file channel to open a translog reader against
* @param path the path to the translog
* @param firstOperationOffset the offset to the first operation
*/
public TranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) {
super(generation, channel, path, firstOperationOffset);
this.length = length;
this.totalOperations = totalOperations;
TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset) {
super(checkpoint.generation, channel, path, firstOperationOffset);
this.length = checkpoint.offset;
this.totalOperations = checkpoint.numOps;
this.checkpoint = checkpoint;
}
/**
* Given a file, opens an {@link TranslogReader}, taking of checking and validating the file header.
* Given a file channel, opens a {@link TranslogReader}, taking care of checking and validating the file header.
*
* @param channel the translog file channel
* @param path the path to the translog
* @param checkpoint the translog checkpoint
* @param translogUUID the tranlog UUID
* @return a new TranslogReader
* @throws IOException if any of the file operations resulted in an I/O exception
*/
public static TranslogReader open(FileChannel channel, Path path, Checkpoint checkpoint, String translogUUID) throws IOException {
public static TranslogReader open(
final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException {
try {
InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel)); // don't close
@ -116,7 +130,10 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref +
" this translog file belongs to a different translog. path:" + path);
}
return new TranslogReader(checkpoint.generation, channel, path, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES, checkpoint.offset, checkpoint.numOps);
final long firstOperationOffset =
ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
return new TranslogReader(checkpoint, channel, path, firstOperationOffset);
default:
throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path);
}
@ -138,6 +155,11 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
return totalOperations;
}
@Override
final Checkpoint getCheckpoint() {
return checkpoint;
}
/**
* reads an operation at the given position into the given buffer.
*/

View File

@ -23,12 +23,11 @@ import org.elasticsearch.common.io.Channels;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot {
private final int totalOperations;
private final Checkpoint checkpoint;
protected final long length;
private final ByteBuffer reusableBuffer;
@ -37,13 +36,13 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap
private BufferedChecksumStreamInput reuse;
/**
* Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point
* at the end of the last operation in this snapshot.
* Create a snapshot of translog file channel.
*/
TranslogSnapshot(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) {
super(generation, channel, path, firstOperationOffset);
TranslogSnapshot(final BaseTranslogReader reader, final long length) {
super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset);
this.length = length;
this.totalOperations = totalOperations;
this.totalOperations = reader.totalOperations();
this.checkpoint = reader.getCheckpoint();
this.reusableBuffer = ByteBuffer.allocate(1024);
readOperations = 0;
position = firstOperationOffset;
@ -55,6 +54,11 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap
return totalOperations;
}
@Override
Checkpoint getCheckpoint() {
return checkpoint;
}
@Override
public Translog.Operation next() throws IOException {
if (readOperations < totalOperations) {

View File

@ -27,6 +27,8 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import java.io.BufferedOutputStream;
@ -60,13 +62,16 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
/* the total offset of this file including the bytes written to the file as well as into the buffer */
private volatile long totalOffset;
private volatile long minSeqNo;
private volatile long maxSeqNo;
private final LongSupplier globalCheckpointSupplier;
protected final AtomicBoolean closed = new AtomicBoolean(false);
// lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object();
public TranslogWriter(
private TranslogWriter(
final ChannelFactory channelFactory,
final ShardId shardId,
final Checkpoint initialCheckpoint,
@ -80,6 +85,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedCheckpoint = initialCheckpoint;
this.totalOffset = initialCheckpoint.offset;
assert initialCheckpoint.minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo;
this.minSeqNo = initialCheckpoint.minSeqNo;
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}
@ -115,10 +124,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
writeHeader(out, ref);
channel.force(true);
final Checkpoint checkpoint =
writeCheckpoint(channelFactory, headerLength, 0, globalCheckpointSupplier.getAsLong(), file.getParent(), fileGeneration);
final TranslogWriter writer =
new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier);
return writer;
Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong());
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
@ -151,21 +159,42 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
/**
* add the given bytes to the translog and return the location they were written at
*/
public synchronized Translog.Location add(BytesReference data) throws IOException {
/**
* Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to.
*
* @param data the bytes to write
* @param seqNo the sequence number associated with the operation
* @return the location the bytes were written to
* @throws IOException if writing to the translog resulted in an I/O exception
*/
public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
ensureOpen();
final long offset = totalOffset;
try {
data.writeTo(outputStream);
} catch (Exception ex) {
} catch (final Exception ex) {
try {
closeWithTragicEvent(ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
}
totalOffset += data.length();
if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
assert operationCounter == 0;
}
if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
assert operationCounter == 0;
}
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
operationCounter++;
return new Translog.Location(generation, offset, data.length());
}
@ -191,13 +220,20 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
return operationCounter;
}
@Override
Checkpoint getCheckpoint() {
return getLastSyncedCheckpoint();
}
@Override
public long sizeInBytes() {
return totalOffset;
}
/**
* closes this writer and transfers it's underlying file channel to a new immutable reader
* Closes this writer and transfers its underlying file channel to a new immutable {@link TranslogReader}
* @return a new {@link TranslogReader}
* @throws IOException if any of the file operations resulted in an I/O exception
*/
public TranslogReader closeIntoReader() throws IOException {
// make sure to acquire the sync lock first, to prevent dead locks with threads calling
@ -218,18 +254,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
throw e;
}
if (closed.compareAndSet(false, true)) {
boolean success = false;
try {
final TranslogReader reader =
new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter);
success = true;
return reader;
} finally {
if (success == false) {
// close the channel, as we are closed and failed to create a new reader
IOUtils.closeWhileHandlingException(channel);
}
}
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset());
} else {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
}
@ -272,14 +297,18 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// the lock we should check again since if this code is busy we might have fsynced enough already
final long offsetToSync;
final int opsCounter;
final long globalCheckpoint;
final long currentMinSeqNo;
final long currentMaxSeqNo;
final long currentGlobalCheckpoint;
synchronized (this) {
ensureOpen();
try {
outputStream.flush();
offsetToSync = totalOffset;
opsCounter = operationCounter;
globalCheckpoint = globalCheckpointSupplier.getAsLong();
currentMinSeqNo = minSeqNo;
currentMaxSeqNo = maxSeqNo;
currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@ -295,7 +324,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
try {
channel.force(false);
checkpoint =
writeCheckpoint(channelFactory, offsetToSync, opsCounter, globalCheckpoint, path.getParent(), generation);
writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, currentGlobalCheckpoint, path.getParent(), generation);
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@ -333,24 +362,32 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
}
private static Checkpoint writeCheckpoint(
ChannelFactory channelFactory,
long syncPosition,
int numOperations,
long globalCheckpoint,
Path translogFile,
long generation) throws IOException {
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, globalCheckpoint);
Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE);
ChannelFactory channelFactory,
long syncPosition,
int numOperations,
long minSeqNo,
long maxSeqNo,
long globalCheckpoint,
Path translogFile,
long generation) throws IOException {
final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint);
writeCheckpoint(channelFactory, translogFile, checkpoint);
return checkpoint;
}
private static void writeCheckpoint(
final ChannelFactory channelFactory,
final Path translogFile,
final Checkpoint checkpoint) throws IOException {
Checkpoint.write(channelFactory, translogFile.resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE);
}
/**
* The last synced checkpoint for this translog.
*
* @return the last synced checkpoint
*/
public Checkpoint getLastSyncedCheckpoint() {
Checkpoint getLastSyncedCheckpoint() {
return lastSyncedCheckpoint;
}
@ -402,4 +439,5 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
throw new IllegalStateException("never close this stream");
}
}
}

View File

@ -168,7 +168,8 @@ public class TruncateTranslogCommand extends EnvironmentAwareCommand {
/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint emptyCheckpoint =
Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class SequenceNumbersTests extends ESTestCase {
public void testMin() {
final long seqNo = randomNonNegativeLong();
assertThat(SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo));
assertThat(
SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(SequenceNumbers.min(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo));
final long minSeqNo = randomNonNegativeLong();
assertThat(SequenceNumbers.min(minSeqNo, seqNo), equalTo(Math.min(minSeqNo, seqNo)));
final IllegalArgumentException e =
expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(minSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(e, hasToString(containsString("sequence number must be assigned")));
}
public void testMax() {
final long seqNo = randomNonNegativeLong();
assertThat(SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo));
assertThat(
SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(SequenceNumbers.max(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo));
final long maxSeqNo = randomNonNegativeLong();
assertThat(SequenceNumbers.min(maxSeqNo, seqNo), equalTo(Math.min(maxSeqNo, seqNo)));
final IllegalArgumentException e =
expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(e, hasToString(containsString("sequence number must be assigned")));
}
}

View File

@ -964,13 +964,23 @@ public class TranslogTests extends ESTestCase {
public void testTranslogWriter() throws IOException {
final TranslogWriter writer = translog.createWriter(0);
final int numOps = randomIntBetween(10, 100);
final int numOps = randomIntBetween(8, 128);
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
final Set<Long> seenSeqNos = new HashSet<>();
boolean opsHaveValidSequenceNumbers = randomBoolean();
for (int i = 0; i < numOps; i++) {
out.reset(bytes);
out.writeInt(i);
writer.add(new BytesArray(bytes));
long seqNo;
do {
seqNo = opsHaveValidSequenceNumbers ? randomNonNegativeLong() : SequenceNumbersService.UNASSIGNED_SEQ_NO;
opsHaveValidSequenceNumbers = opsHaveValidSequenceNumbers || !rarely();
} while (seenSeqNos.contains(seqNo));
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seenSeqNos.add(seqNo);
}
writer.add(new BytesArray(bytes), seqNo);
}
writer.sync();
@ -982,10 +992,14 @@ public class TranslogTests extends ESTestCase {
final int value = buffer.getInt();
assertEquals(i, value);
}
final long minSeqNo = seenSeqNos.stream().min(Long::compareTo).orElse(SequenceNumbersService.NO_OPS_PERFORMED);
final long maxSeqNo = seenSeqNos.stream().max(Long::compareTo).orElse(SequenceNumbersService.NO_OPS_PERFORMED);
assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo));
assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo));
out.reset(bytes);
out.writeInt(2048);
writer.add(new BytesArray(bytes));
writer.add(new BytesArray(bytes), randomNonNegativeLong());
if (reader instanceof TranslogReader) {
ByteBuffer buffer = ByteBuffer.allocate(4);
@ -1008,40 +1022,30 @@ public class TranslogTests extends ESTestCase {
IOUtils.close(writer);
}
public void testFailWriterWhileClosing() throws IOException {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
fail.failNever();
TranslogConfig config = getTranslogConfig(tempDir);
try (Translog translog = getFailableTranslog(fail, config)) {
final TranslogWriter writer = translog.createWriter(0);
final int numOps = randomIntBetween(10, 100);
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
public void testCloseIntoReader() throws IOException {
try (TranslogWriter writer = translog.createWriter(0)) {
final int numOps = randomIntBetween(8, 128);
final byte[] bytes = new byte[4];
final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
for (int i = 0; i < numOps; i++) {
out.reset(bytes);
out.writeInt(i);
writer.add(new BytesArray(bytes));
writer.add(new BytesArray(bytes), randomNonNegativeLong());
}
writer.sync();
try {
fail.failAlways();
writer.closeIntoReader();
fail();
} catch (MockDirectoryWrapper.FakeIOException ex) {
}
try (TranslogReader reader = translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)))) {
final Checkpoint writerCheckpoint = writer.getCheckpoint();
try (TranslogReader reader = writer.closeIntoReader()) {
for (int i = 0; i < numOps; i++) {
ByteBuffer buffer = ByteBuffer.allocate(4);
final ByteBuffer buffer = ByteBuffer.allocate(4);
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
buffer.flip();
final int value = buffer.getInt();
assertEquals(i, value);
}
final Checkpoint readerCheckpoint = reader.getCheckpoint();
assertThat(readerCheckpoint, equalTo(writerCheckpoint));
}
}
}
public void testBasicRecovery() throws IOException {
@ -1209,12 +1213,12 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0, 0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}");
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage());
}
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
@ -1663,7 +1667,7 @@ public class TranslogTests extends ESTestCase {
FileChannel channel = factory.open(file, openOption);
boolean success = false;
try {
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the 20bytes are written as an atomic operation
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
@ -1962,11 +1966,26 @@ public class TranslogTests extends ESTestCase {
}
}
private Checkpoint randomCheckpoint() {
final long a = randomNonNegativeLong();
final long b = randomNonNegativeLong();
final long minSeqNo;
final long maxSeqNo;
if (a <= b) {
minSeqNo = a;
maxSeqNo = b;
} else {
minSeqNo = b;
maxSeqNo = a;
}
return new Checkpoint(randomLong(), randomInt(), randomLong(), minSeqNo, maxSeqNo, randomNonNegativeLong());
}
public void testCheckpointOnDiskFull() throws IOException {
Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong());
final Checkpoint checkpoint = randomCheckpoint();
Path tempDir = createTempDir();
Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong());
final Checkpoint checkpoint2 = randomCheckpoint();
try {
Checkpoint.write((p, o) -> {
if (randomBoolean()) {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
@ -84,14 +84,13 @@ public class TranslogVersionTests extends ESTestCase {
checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", "pre-2.0 translog");
}
public TranslogReader openReader(Path path, long id) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
try {
TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id, 0), null);
channel = null;
return reader;
} finally {
IOUtils.close(channel);
public TranslogReader openReader(final Path path, final long id) throws IOException {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final Checkpoint checkpoint =
new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO);
return TranslogReader.open(channel, path, checkpoint, null);
}
}
}