Allow to trim all ops above a certain seq# with a term lower than X (#30176)

Allow to trim all ops above a certain seq# with a term lower than X

Relates to #10708
This commit is contained in:
Vladimir Dolzhenko 2018-06-08 09:16:38 -07:00 committed by GitHub
parent 01140a3ad8
commit a86c0f8c25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 579 additions and 69 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -33,17 +34,24 @@ import java.util.Arrays;
*/
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
private long trimAboveSeqNo;
private Translog.Operation[] operations;
ResyncReplicationRequest() {
super();
}
public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.operations = operations;
}
public long getTrimAboveSeqNo() {
return trimAboveSeqNo;
}
public Translog.Operation[] getOperations() {
return operations;
}
@ -60,12 +68,20 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
}
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(trimAboveSeqNo);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}
@ -74,12 +90,13 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return Arrays.equals(operations, that.operations);
return trimAboveSeqNo == that.trimAboveSeqNo
&& Arrays.equals(operations, that.operations);
}
@Override
public int hashCode() {
return Arrays.hashCode(operations);
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
}
@Override
@ -88,6 +105,7 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", ops=" + operations.length +
"}";
}

View File

@ -135,6 +135,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
}
}
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());
}
return location;
}

View File

@ -236,6 +236,12 @@ public abstract class Engine implements Closeable {
*/
public abstract boolean isThrottled();
/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;
/** A Lock implementation that always allows the lock to be acquired */
protected static final class NoOpLock implements Lock {
@ -904,7 +910,7 @@ public abstract class Engine implements Closeable {
* checks and removes translog files that no longer need to be retained. See
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimTranslog() throws EngineException;
public abstract void trimUnreferencedTranslogFiles() throws EngineException;
/**
* Tests whether or not the translog generation should be rolled to a new generation.

View File

@ -1552,7 +1552,7 @@ public class InternalEngine extends Engine {
}
@Override
public void trimTranslog() throws EngineException {
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
@ -1569,6 +1569,24 @@ public class InternalEngine extends Engine {
}
}
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog operations", e);
}
}
private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary

View File

@ -37,7 +37,7 @@ public class SequenceNumbers {
*/
public static final long UNASSIGNED_SEQ_NO = -2L;
/**
* Represents no operations have been performed on the shard.
* Represents no operations have been performed on the shard. Initial value of a sequence number.
*/
public static final long NO_OPS_PERFORMED = -1L;

View File

@ -992,7 +992,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
engine.trimTranslog();
engine.trimUnreferencedTranslogFiles();
}
/**
@ -1194,6 +1194,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert currentEngineReference.get() == null;
}
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo);
}
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
final Engine.Result result;
switch (operation.opType()) {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
@ -84,6 +85,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
resyncListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(final ResyncTask resyncTask) {
@ -135,7 +137,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
};
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
startingSeqNo, resyncListener);
startingSeqNo, maxSeqNo, resyncListener);
} catch (Exception e) {
if (resyncListener != null) {
resyncListener.onFailure(e);
@ -146,7 +148,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@ -166,7 +168,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
@ -186,14 +188,16 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
private final AtomicInteger totalSentOps = new AtomicInteger();
private final AtomicInteger totalSkippedOps = new AtomicInteger();
private AtomicBoolean closed = new AtomicBoolean();
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
@ -203,6 +207,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
this.snapshot = snapshot;
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
@ -248,11 +253,15 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
}
}
if (!operations.isEmpty()) {
final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
} else if (closed.compareAndSet(false, true)) {
logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());

View File

@ -45,14 +45,29 @@ final class Checkpoint {
final long maxSeqNo;
final long globalCheckpoint;
final long minTranslogGeneration;
final long trimmedAboveSeqNo;
private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
private static final int CURRENT_VERSION = 2; // introduction of global checkpoints
private static final int VERSION_6_0_0 = 2; // introduction of global checkpoints
private static final int CURRENT_VERSION = 3; // introduction of trimmed above seq#
private static final String CHECKPOINT_CODEC = "ckp";
// size of 7.0.0 checkpoint
static final int V3_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
+ Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES // generation
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0
+ Long.BYTES // maximum reachable (trimmed) sequence number, introduced in 6.4.0
+ CodecUtil.footerLength();
// size of 6.0.0 checkpoint
static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
static final int V2_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
+ Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES // generation
@ -72,16 +87,20 @@ final class Checkpoint {
/**
* Create a new translog checkpoint.
*
* @param offset the current offset in the translog
* @param numOps the current number of operations in the translog
* @param generation the current translog generation
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
* @param offset the current offset in the translog
* @param numOps the current number of operations in the translog
* @param generation the current translog generation
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
* @param minTranslogGeneration the minimum generation referenced by the translog at this moment.
* @param trimmedAboveSeqNo all operations with seq# above trimmedAboveSeqNo should be ignored and not read from the
* corresponding translog file. {@link SequenceNumbers#UNASSIGNED_SEQ_NO} is used to disable trimming.
*/
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) {
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint,
long minTranslogGeneration, long trimmedAboveSeqNo) {
assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
assert trimmedAboveSeqNo <= maxSeqNo : "trimmedAboveSeqNo [" + trimmedAboveSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
assert minTranslogGeneration <= generation :
"minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]";
this.offset = offset;
@ -91,6 +110,7 @@ final class Checkpoint {
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
this.minTranslogGeneration = minTranslogGeneration;
this.trimmedAboveSeqNo = trimmedAboveSeqNo;
}
private void write(DataOutput out) throws IOException {
@ -101,26 +121,52 @@ final class Checkpoint {
out.writeLong(maxSeqNo);
out.writeLong(globalCheckpoint);
out.writeLong(minTranslogGeneration);
out.writeLong(trimmedAboveSeqNo);
}
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
long minTranslogGeneration) {
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo);
}
static Checkpoint readCheckpointV6_4_0(final DataInput in) throws IOException {
final long offset = in.readLong();
final int numOps = in.readInt();
final long generation = in.readLong();
final long minSeqNo = in.readLong();
final long maxSeqNo = in.readLong();
final long globalCheckpoint = in.readLong();
final long minTranslogGeneration = in.readLong();
final long trimmedAboveSeqNo = in.readLong();
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo);
}
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
final long offset = in.readLong();
final int numOps = in.readInt();
final long generation = in.readLong();
final long minSeqNo = in.readLong();
final long maxSeqNo = in.readLong();
final long globalCheckpoint = in.readLong();
final long minTranslogGeneration = in.readLong();
final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo);
}
// reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
final long offset = in.readLong();
final int numOps = in.readInt();
final long generation = in.readLong();
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final long globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long minTranslogGeneration = -1L;
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
final long minTranslogGeneration = -1;
final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo);
}
@Override
@ -133,6 +179,7 @@ final class Checkpoint {
", maxSeqNo=" + maxSeqNo +
", globalCheckpoint=" + globalCheckpoint +
", minTranslogGeneration=" + minTranslogGeneration +
", trimmedAboveSeqNo=" + trimmedAboveSeqNo +
'}';
}
@ -145,17 +192,20 @@ final class Checkpoint {
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV5_0_0(indexInput);
} else if (fileVersion == VERSION_6_0_0) {
assert indexInput.length() == V2_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_0_0(indexInput);
} else {
assert fileVersion == CURRENT_VERSION : fileVersion;
assert indexInput.length() == FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_0_0(indexInput);
assert indexInput.length() == V3_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_4_0(indexInput);
}
}
}
}
public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(FILE_SIZE) {
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V3_FILE_SIZE) {
@Override
public synchronized byte[] toByteArray() {
// don't clone
@ -164,13 +214,13 @@ final class Checkpoint {
};
final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")";
try (OutputStreamIndexOutput indexOutput =
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) {
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, V3_FILE_SIZE)) {
CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, CURRENT_VERSION);
checkpoint.write(indexOutput);
CodecUtil.writeFooter(indexOutput);
assert indexOutput.getFilePointer() == FILE_SIZE :
"get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + FILE_SIZE;
assert indexOutput.getFilePointer() == V3_FILE_SIZE :
"get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + V3_FILE_SIZE;
assert indexOutput.getFilePointer() < 512 :
"checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer();
@ -196,7 +246,8 @@ final class Checkpoint {
if (generation != that.generation) return false;
if (minSeqNo != that.minSeqNo) return false;
if (maxSeqNo != that.maxSeqNo) return false;
return globalCheckpoint == that.globalCheckpoint;
if (globalCheckpoint != that.globalCheckpoint) return false;
return trimmedAboveSeqNo == that.trimmedAboveSeqNo;
}
@Override
@ -207,6 +258,7 @@ final class Checkpoint {
result = 31 * result + Long.hashCode(minSeqNo);
result = 31 * result + Long.hashCode(maxSeqNo);
result = 31 * result + Long.hashCode(globalCheckpoint);
result = 31 * result + Long.hashCode(trimmedAboveSeqNo);
return result;
}

View File

@ -56,6 +56,15 @@ final class MultiSnapshot implements Translog.Snapshot {
return totalOperations;
}
@Override
public int skippedOperations() {
int skippedOperations = overriddenOperations;
for (TranslogSnapshot translog : translogs) {
skippedOperations += translog.skippedOperations();
}
return skippedOperations;
}
@Override
public int overriddenOperations() {
return overriddenOperations;

View File

@ -696,6 +696,41 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return TRANSLOG_FILE_PREFIX + generation + CHECKPOINT_SUFFIX;
}
/**
* Trims translog for terms of files below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>.
* Effectively it moves max visible seq# {@link Checkpoint#trimmedAboveSeqNo} therefore {@link TranslogSnapshot} skips those operations.
*/
public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
assert aboveSeqNo >= SequenceNumbers.NO_OPS_PERFORMED : "aboveSeqNo has to a valid sequence number";
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (current.getPrimaryTerm() < belowTerm) {
throw new IllegalArgumentException("Trimming the translog can only be done for terms lower than the current one. " +
"Trim requested for term [ " + belowTerm + " ] , current is [ " + current.getPrimaryTerm() + " ]");
}
// we assume that the current translog generation doesn't have trimmable ops. Verify that.
assert current.assertNoSeqAbove(belowTerm, aboveSeqNo);
// update all existed ones (if it is necessary) as checkpoint and reader are immutable
final List<TranslogReader> newReaders = new ArrayList<>(readers.size());
try {
for (TranslogReader reader : readers) {
final TranslogReader newReader =
reader.getPrimaryTerm() < belowTerm
? reader.closeIntoTrimmedReader(aboveSeqNo, getChannelFactory())
: reader;
newReaders.add(newReader);
}
} catch (IOException e) {
IOUtils.closeWhileHandlingException(newReaders);
close();
throw e;
}
this.readers.clear();
this.readers.addAll(newReaders);
}
}
/**
* Ensures that the given location has be synced / written to the underlying storage.
@ -845,6 +880,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
int totalOperations();
/**
* The number of operations have been skipped (overridden or trimmed) in the snapshot so far.
*/
default int skippedOperations() {
return 0;
}
/**
* The number of operations have been overridden (eg. superseded) in the snapshot so far.
* If two operations have the same sequence number, the operation with a lower term will be overridden by the operation

View File

@ -21,6 +21,8 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.Closeable;
import java.io.EOFException;
@ -28,8 +30,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.index.translog.Translog.getCommitCheckpointFileName;
/**
* an immutable translog filereader
*/
@ -70,6 +75,39 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
return new TranslogReader(checkpoint, channel, path, header);
}
/**
* Closes current reader and creates new one with new checkoint and same file channel
*/
TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFactory) throws IOException {
if (closed.compareAndSet(false, true)) {
Closeable toCloseOnFailure = channel;
final TranslogReader newReader;
try {
if (aboveSeqNo < checkpoint.trimmedAboveSeqNo
|| aboveSeqNo < checkpoint.maxSeqNo && checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
final Path checkpointFile = path.getParent().resolve(getCommitCheckpointFileName(checkpoint.generation));
final Checkpoint newCheckpoint = new Checkpoint(checkpoint.offset, checkpoint.numOps,
checkpoint.generation, checkpoint.minSeqNo, checkpoint.maxSeqNo,
checkpoint.globalCheckpoint, checkpoint.minTranslogGeneration, aboveSeqNo);
Checkpoint.write(channelFactory, checkpointFile, newCheckpoint, StandardOpenOption.WRITE);
IOUtils.fsync(checkpointFile, false);
IOUtils.fsync(checkpointFile.getParent(), true);
newReader = new TranslogReader(newCheckpoint, channel, path, header);
} else {
newReader = new TranslogReader(checkpoint, channel, path, header);
}
toCloseOnFailure = null;
return newReader;
} finally {
IOUtils.close(toCloseOnFailure);
}
} else {
throw new AlreadyClosedException(toString() + " is already closed");
}
}
public long sizeInBytes() {
return length;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.translog;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.EOFException;
import java.io.IOException;
@ -32,6 +33,7 @@ final class TranslogSnapshot extends BaseTranslogReader {
private final ByteBuffer reusableBuffer;
private long position;
private int skippedOperations;
private int readOperations;
private BufferedChecksumStreamInput reuse;
@ -54,17 +56,24 @@ final class TranslogSnapshot extends BaseTranslogReader {
return totalOperations;
}
int skippedOperations(){
return skippedOperations;
}
@Override
Checkpoint getCheckpoint() {
return checkpoint;
}
public Translog.Operation next() throws IOException {
if (readOperations < totalOperations) {
return readOperation();
} else {
return null;
while (readOperations < totalOperations) {
final Translog.Operation operation = readOperation();
if (operation.seqNo() <= checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
return operation;
}
skippedOperations++;
}
return null;
}
protected Translog.Operation readOperation() throws IOException {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.translog;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.bytes.BytesArray;
@ -92,6 +91,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
this.minSeqNo = initialCheckpoint.minSeqNo;
assert initialCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
}
@ -213,6 +213,25 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
return true;
}
synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
seenSequenceNumbers.entrySet().stream().filter(e -> e.getKey().longValue() > aboveSeqNo)
.forEach(e -> {
final Translog.Operation op;
try {
op = Translog.readOperation(new BufferedChecksumStreamInput(e.getValue().v1().streamInput()));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
long seqNo = op.seqNo();
long primaryTerm = op.primaryTerm();
if (primaryTerm < belowTerm) {
throw new AssertionError("current should not have any operations with seq#:primaryTerm ["
+ seqNo + ":" + primaryTerm + "] > " + aboveSeqNo + ":" + belowTerm);
}
});
return true;
}
/**
* write all buffered ops to disk and fsync file.
*
@ -241,7 +260,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
@Override
synchronized Checkpoint getCheckpoint() {
return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo,
globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong());
globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO);
}
@Override

View File

@ -615,9 +615,9 @@ public class RecoverySourceHandler {
cancellableThreads.executeIO(sendBatch);
}
assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps);
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +

View File

@ -40,7 +40,7 @@ public class ResyncReplicationRequestTests extends ESTestCase {
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);

View File

@ -340,9 +340,10 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
op1 = snapshot.next();
assertThat(op1, notNullValue());
assertThat(snapshot.next(), nullValue());
assertThat(snapshot.overriddenOperations(), equalTo(0));
assertThat(snapshot.skippedOperations(), equalTo(0));
}
// Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1).
// Make sure that replica2 receives translog ops (eg. op2) from replica1
// and does not overwrite its stale operation (op1) as it is trimmed.
logger.info("--> Promote replica1 as the primary");
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
@ -353,7 +354,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat(snapshot.overriddenOperations(), equalTo(1));
assertThat(snapshot.overriddenOperations(), equalTo(0));
assertThat(snapshot.skippedOperations(), equalTo(1));
}
// Make sure that peer-recovery transfers all but non-overridden operations.
@ -366,7 +368,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
assertThat(snapshot.next(), equalTo(op2));
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0));
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
}
// TODO: We should assert the content of shards in the ReplicationGroup.
// Without rollback replicas(current implementation), we don't have the same content across shards:

View File

@ -53,8 +53,12 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -65,6 +69,7 @@ import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -353,10 +358,19 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE")
public void testResyncAfterPrimaryPromotion() throws Exception {
// TODO: check translog trimming functionality once it's implemented
try (ReplicationGroup shards = createGroup(2)) {
// TODO: check translog trimming functionality once rollback is implemented in Lucene (ES trimming is done)
Map<String, String> mappings =
Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
shards.startAll();
int initialDocs = shards.indexDocs(randomInt(10));
int initialDocs = randomInt(10);
for (int i = 0; i < initialDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "initial_doc_" + i)
.source("{ \"f\": \"normal\"}", XContentType.JSON);
shards.index(indexRequest);
}
boolean syncedGlobalCheckPoint = randomBoolean();
if (syncedGlobalCheckPoint) {
shards.syncGlobalCheckpoint();
@ -364,16 +378,30 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard justReplica = shards.getReplicas().get(1);
// simulate docs that were inflight when primary failed
final int extraDocs = randomIntBetween(0, 5);
final int extraDocs = randomInt(5);
logger.info("--> indexing {} extra docs", extraDocs);
for (int i = 0; i < extraDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i)
.source("{}", XContentType.JSON);
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_doc_" + i)
.source("{ \"f\": \"normal\"}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
indexOnReplica(bulkShardRequest, shards, newPrimary);
}
final int extraDocsToBeTrimmed = randomIntBetween(0, 10);
logger.info("--> indexing {} extra docs to be trimmed", extraDocsToBeTrimmed);
for (int i = 0; i < extraDocsToBeTrimmed; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_trimmed_" + i)
.source("{ \"f\": \"trimmed\"}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
// have to replicate to another replica != newPrimary one - the subject to trim
indexOnReplica(bulkShardRequest, shards, justReplica);
}
logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats());
logger.info("--> resyncing replicas");
PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();
if (syncedGlobalCheckPoint) {
@ -381,7 +409,36 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
} else {
assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
}
shards.assertAllEqual(initialDocs + extraDocs);
List<IndexShard> replicas = shards.getReplicas();
// check all docs on primary are available on replica
Set<String> primaryIds = getShardDocUIDs(newPrimary);
assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs));
for (IndexShard replica : replicas) {
Set<String> replicaIds = getShardDocUIDs(replica);
Set<String> temp = new HashSet<>(primaryIds);
temp.removeAll(replicaIds);
assertThat(replica.routingEntry() + " is missing docs", temp, empty());
temp = new HashSet<>(replicaIds);
temp.removeAll(primaryIds);
// yeah, replica has more docs as there is no Lucene roll back on it
assertThat(replica.routingEntry() + " has to have extra docs", temp,
extraDocsToBeTrimmed > 0 ? not(empty()) : empty());
}
// check translog on replica is trimmed
int translogOperations = 0;
try(Translog.Snapshot snapshot = getTranslog(justReplica).newSnapshot()) {
Translog.Operation next;
while ((next = snapshot.next()) != null) {
translogOperations++;
assertThat("unexpected op: " + next, (int)next.seqNo(), lessThan(initialDocs + extraDocs));
assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPrimaryTerm()));
final Translog.Source source = next.getSource();
assertThat(source.source.utf8ToString(), is("{ \"f\": \"normal\"}"));
}
}
assertThat(translogOperations, is(initialDocs + extraDocs));
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -36,15 +37,20 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
@ -53,15 +59,17 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
AtomicBoolean syncActionCalled = new AtomicBoolean();
List<ResyncReplicationRequest> resyncRequests = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, primaryTerm, listener) -> {
logger.info("Sending off {} operations", request.getOperations().length);
syncActionCalled.set(true);
resyncRequests.add(request);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 100)));
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
int numDocs = randomInt(10);
for (int i = 0; i < numDocs; i++) {
@ -72,7 +80,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
}
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
boolean syncNeeded = numDocs > 0;
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
@ -84,19 +92,29 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.get();
PrimaryReplicaSyncer.ResyncTask resyncTask = fut.get();
if (syncNeeded) {
assertTrue("Sync action was not called", syncActionCalled.get());
ResyncReplicationRequest resyncRequest = resyncRequests.remove(0);
assertThat(resyncRequest.getTrimAboveSeqNo(), equalTo(numDocs - 1L));
assertThat("trimAboveSeqNo has to be specified in request #0 only", resyncRequests.stream()
.mapToLong(ResyncReplicationRequest::getTrimAboveSeqNo)
.filter(seqNo -> seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO)
.findFirst()
.isPresent(),
is(false));
}
assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, fut.get().getTotalOperations());
if (syncNeeded) {
assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations());
if (syncNeeded && globalCheckPoint < numDocs - 1) {
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
assertEquals(skippedOps, fut.get().getSkippedOperations());
assertEquals(numDocs - skippedOps, fut.get().getResyncedOperations());
assertEquals(skippedOps, resyncTask.getSkippedOperations());
assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations());
} else {
assertEquals(0, fut.get().getSkippedOperations());
assertEquals(0, fut.get().getResyncedOperations());
assertEquals(0, resyncTask.getSkippedOperations());
assertEquals(0, resyncTask.getResyncedOperations());
}
closeShards(shard);

View File

@ -118,7 +118,8 @@ public class TranslogHeaderTests extends ESTestCase {
assertThat("test file [" + translogFile + "] should exist", Files.exists(translogFile), equalTo(true));
final E error = expectThrows(expectedErrorType, () -> {
final Checkpoint checkpoint = new Checkpoint(Files.size(translogFile), 1, 1,
SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, 1);
SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED, 1, SequenceNumbers.NO_OPS_PERFORMED);
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogReader.open(channel, translogFile, checkpoint, null);
}

View File

@ -107,7 +107,9 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -120,8 +122,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.stub;
@ -1474,8 +1479,8 @@ public class TranslogTests extends ESTestCase {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " +
"numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " +
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0}", ex.getMessage());
"numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " +
"generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage());
}
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
@ -1507,6 +1512,191 @@ public class TranslogTests extends ESTestCase {
assertEquals(ops, readOperations);
}
public void testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations() throws Exception {
int extraDocs = randomIntBetween(10, 15);
// increment primaryTerm to avoid potential negative numbers
primaryTerm.addAndGet(extraDocs);
translog.rollGeneration();
for (int op = 0; op < extraDocs; op++) {
String ascii = randomAlphaOfLengthBetween(1, 50);
Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get() - op,
ascii.getBytes("UTF-8"));
translog.add(operation);
}
AssertionError error = expectThrows(AssertionError.class, () -> translog.trimOperations(primaryTerm.get(), 0));
assertThat(error.getMessage(), is("current should not have any operations with seq#:primaryTerm "
+ "[1:" + (primaryTerm.get() - 1) + "] > 0:" + primaryTerm.get()));
primaryTerm.incrementAndGet();
translog.rollGeneration();
// add a single operation to current with seq# > trimmed seq# but higher primary term
Translog.Index operation = new Translog.Index("test", "" + 1, 1L, primaryTerm.get(),
randomAlphaOfLengthBetween(1, 50).getBytes("UTF-8"));
translog.add(operation);
// it is possible to trim after generation rollover
translog.trimOperations(primaryTerm.get(), 0);
}
public void testSnapshotTrimmedOperations() throws Exception {
final InMemoryTranslog inMemoryTranslog = new InMemoryTranslog();
final List<Translog.Operation> allOperations = new ArrayList<>();
for(int attempt = 0, maxAttempts = randomIntBetween(3, 10); attempt < maxAttempts; attempt++) {
List<Long> ops = LongStream.range(0, allOperations.size() + randomIntBetween(10, 15))
.boxed().collect(Collectors.toList());
Randomness.shuffle(ops);
AtomicReference<String> source = new AtomicReference<>();
for (final long op : ops) {
source.set(randomAlphaOfLengthBetween(1, 50));
// have to use exactly the same source for same seq# if primaryTerm is not changed
if (primaryTerm.get() == translog.getCurrent().getPrimaryTerm()) {
// use the latest source of op with the same seq# - therefore no break
allOperations
.stream()
.filter(allOp -> allOp instanceof Translog.Index && allOp.seqNo() == op)
.map(allOp -> ((Translog.Index)allOp).source().utf8ToString())
.reduce((a, b) -> b)
.ifPresent(source::set);
}
// use ongoing primaryTerms - or the same as it was
Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get(),
source.get().getBytes("UTF-8"));
translog.add(operation);
inMemoryTranslog.add(operation);
allOperations.add(operation);
}
if (randomBoolean()) {
primaryTerm.incrementAndGet();
translog.rollGeneration();
}
long maxTrimmedSeqNo = randomInt(allOperations.size());
translog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
inMemoryTranslog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
translog.sync();
Collection<Translog.Operation> effectiveOperations = inMemoryTranslog.operations();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, containsOperationsInAnyOrder(effectiveOperations));
assertThat(snapshot.totalOperations(), is(allOperations.size()));
assertThat(snapshot.skippedOperations(), is(allOperations.size() - effectiveOperations.size()));
}
}
}
/**
* this class mimic behaviour of original {@link Translog}
*/
static class InMemoryTranslog {
private final Map<Long, Translog.Operation> operations = new HashMap<>();
void add(Translog.Operation operation) {
final Translog.Operation old = operations.put(operation.seqNo(), operation);
assert old == null || old.primaryTerm() <= operation.primaryTerm();
}
void trimOperations(long belowTerm, long aboveSeqNo) {
for (final Iterator<Map.Entry<Long, Translog.Operation>> it = operations.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<Long, Translog.Operation> next = it.next();
Translog.Operation op = next.getValue();
boolean drop = op.primaryTerm() < belowTerm && op.seqNo() > aboveSeqNo;
if (drop) {
it.remove();
}
}
}
Collection<Translog.Operation> operations() {
return operations.values();
}
}
public void testRandomExceptionsOnTrimOperations( ) throws Exception {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
fail.failNever();
TranslogConfig config = getTranslogConfig(tempDir);
List<FileChannel> fileChannels = new ArrayList<>();
final Translog failableTLog =
getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy(), fileChannels);
IOException expectedException = null;
int translogOperations = 0;
final int maxAttempts = 10;
for(int attempt = 0; attempt < maxAttempts; attempt++) {
int maxTrimmedSeqNo;
fail.failNever();
int extraTranslogOperations = randomIntBetween(10, 100);
List<Integer> ops = IntStream.range(translogOperations, translogOperations + extraTranslogOperations)
.boxed().collect(Collectors.toList());
Randomness.shuffle(ops);
for (int op : ops) {
String ascii = randomAlphaOfLengthBetween(1, 50);
Translog.Index operation = new Translog.Index("test", "" + op, op,
primaryTerm.get(), ascii.getBytes("UTF-8"));
failableTLog.add(operation);
}
translogOperations += extraTranslogOperations;
// at least one roll + inc of primary term has to be there - otherwise trim would not take place at all
// last attempt we have to make roll as well - otherwise could skip trimming as it has been trimmed already
boolean rollover = attempt == 0 || attempt == maxAttempts - 1 || randomBoolean();
if (rollover) {
primaryTerm.incrementAndGet();
failableTLog.rollGeneration();
}
maxTrimmedSeqNo = rollover ? translogOperations - randomIntBetween(4, 8) : translogOperations + 1;
// if we are so happy to reach the max attempts - fail it always`
fail.failRate(attempt < maxAttempts - 1 ? 25 : 100);
try {
failableTLog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo);
} catch (IOException e){
expectedException = e;
break;
}
}
assertThat(expectedException, is(not(nullValue())));
assertThat(fileChannels, is(not(empty())));
assertThat("all file channels have to be closed",
fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false));
assertThat(failableTLog.isOpen(), is(false));
final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, () -> failableTLog.newSnapshot());
assertThat(alreadyClosedException.getMessage(),
is("translog is already closed"));
fail.failNever();
// check that despite of IO exception translog is not corrupted
try(Translog reopenedTranslog = openTranslog(config, failableTLog.getTranslogUUID())) {
try (Translog.Snapshot snapshot = reopenedTranslog.newSnapshot()) {
assertThat(snapshot.totalOperations(), greaterThan(0));
Translog.Operation operation;
for (int i = 0; (operation = snapshot.next()) != null; i++) {
assertNotNull("operation " + i + " must be non-null", operation);
}
}
}
}
public void testLocationHashCodeEquals() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
List<Translog.Location> locations2 = new ArrayList<>();
@ -2007,7 +2197,8 @@ public class TranslogTests extends ESTestCase {
private volatile boolean onceFailedFailAlways = false;
public boolean fail() {
boolean fail = randomIntBetween(1, 100) <= failRate;
final int rnd = randomIntBetween(1, 100);
boolean fail = rnd <= failRate;
if (fail && onceFailedFailAlways) {
failAlways();
}
@ -2026,17 +2217,30 @@ public class TranslogTests extends ESTestCase {
failRate = randomIntBetween(1, 100);
}
public void failRate(int rate) {
failRate = rate;
}
public void onceFailedFailAlways() {
onceFailedFailAlways = true;
}
}
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites,
final boolean throwUnknownException, String translogUUID,
final TranslogDeletionPolicy deletionPolicy) throws IOException {
return getFailableTranslog(fail, config, partialWrites, throwUnknownException, translogUUID, deletionPolicy, null);
}
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites,
final boolean throwUnknownException, String translogUUID,
final TranslogDeletionPolicy deletionPolicy,
final List<FileChannel> fileChannels) throws IOException {
final ChannelFactory channelFactory = (file, openOption) -> {
FileChannel channel = FileChannel.open(file, openOption);
if (fileChannels != null) {
fileChannels.add(channel);
}
boolean success = false;
try {
final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
@ -2393,7 +2597,7 @@ public class TranslogTests extends ESTestCase {
}
final long generation = randomNonNegativeLong();
return new Checkpoint(randomLong(), randomInt(), generation, minSeqNo, maxSeqNo, randomNonNegativeLong(),
randomLongBetween(1, generation));
randomLongBetween(1, generation), maxSeqNo);
}
public void testCheckpointOnDiskFull() throws IOException {
@ -2617,7 +2821,7 @@ public class TranslogTests extends ESTestCase {
assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos));
readFromSnapshot++;
}
readFromSnapshot += snapshot.overriddenOperations();
readFromSnapshot += snapshot.skippedOperations();
}
assertThat(readFromSnapshot, equalTo(expectedSnapshotOps));
final long seqNoLowerBound = seqNo;