diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index 07dcdf7db11..7be3406659e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -33,17 +34,24 @@ import java.util.Arrays; */ public final class ResyncReplicationRequest extends ReplicatedWriteRequest { + private long trimAboveSeqNo; private Translog.Operation[] operations; ResyncReplicationRequest() { super(); } - public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) { + public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, + final Translog.Operation[] operations) { super(shardId); + this.trimAboveSeqNo = trimAboveSeqNo; this.operations = operations; } + public long getTrimAboveSeqNo() { + return trimAboveSeqNo; + } + public Translog.Operation[] getOperations() { return operations; } @@ -60,12 +68,20 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequestbelowTerm and seq# above aboveSeqNo + * @see Translog#trimOperations(long, long) + */ + public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException; + /** A Lock implementation that always allows the lock to be acquired */ protected static final class NoOpLock implements Lock { @@ -904,7 +910,7 @@ public abstract class Engine implements Closeable { * checks and removes translog files that no longer need to be retained. See * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details */ - public abstract void trimTranslog() throws EngineException; + public abstract void trimUnreferencedTranslogFiles() throws EngineException; /** * Tests whether or not the translog generation should be rolled to a new generation. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bca84f81a29..88e71608452 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1552,7 +1552,7 @@ public class InternalEngine extends Engine { } @Override - public void trimTranslog() throws EngineException { + public void trimUnreferencedTranslogFiles() throws EngineException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); translog.trimUnreferencedReaders(); @@ -1569,6 +1569,24 @@ public class InternalEngine extends Engine { } } + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + translog.trimOperations(belowTerm, aboveSeqNo); + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + try { + failEngine("translog operations trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog operations", e); + } + } + private void pruneDeletedTombstones() { /* * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary diff --git a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index 0c071f4b2d4..7cffc8c1ac9 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -37,7 +37,7 @@ public class SequenceNumbers { */ public static final long UNASSIGNED_SEQ_NO = -2L; /** - * Represents no operations have been performed on the shard. + * Represents no operations have been performed on the shard. Initial value of a sequence number. */ public static final long NO_OPS_PERFORMED = -1L; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c76ce128763..8583b6b4c9b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -992,7 +992,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public void trimTranslog() { verifyNotClosed(); final Engine engine = getEngine(); - engine.trimTranslog(); + engine.trimUnreferencedTranslogFiles(); } /** @@ -1194,6 +1194,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert currentEngineReference.get() == null; } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { + getEngine().trimOperationsFromTranslog(primaryTerm, aboveSeqNo); + } + public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { final Engine.Result result; switch (operation.opType()) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index af8c9bdd027..8e05e7bf08e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -84,6 +85,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); resyncListener = new ActionListener() { @Override public void onResponse(final ResyncTask resyncTask) { @@ -135,7 +137,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { } }; resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, - startingSeqNo, resyncListener); + startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { if (resyncListener != null) { resyncListener.onFailure(e); @@ -146,7 +148,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { } private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, - long startingSeqNo, ActionListener listener) { + long startingSeqNo, long maxSeqNo, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @@ -166,7 +168,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { }; try { new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, wrappedListener).run(); + startingSeqNo, maxSeqNo, wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -186,14 +188,16 @@ public class PrimaryReplicaSyncer extends AbstractComponent { private final ShardId shardId; private final Translog.Snapshot snapshot; private final long startingSeqNo; + private final long maxSeqNo; private final int chunkSizeInBytes; private final ActionListener listener; + private final AtomicBoolean firstMessage = new AtomicBoolean(true); private final AtomicInteger totalSentOps = new AtomicInteger(); private final AtomicInteger totalSkippedOps = new AtomicInteger(); private AtomicBoolean closed = new AtomicBoolean(); SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm, - Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener listener) { + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener listener) { this.logger = logger; this.syncAction = syncAction; this.task = task; @@ -203,6 +207,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { this.snapshot = snapshot; this.chunkSizeInBytes = chunkSizeInBytes; this.startingSeqNo = startingSeqNo; + this.maxSeqNo = maxSeqNo; this.listener = listener; task.setTotalOperations(snapshot.totalOperations()); } @@ -248,11 +253,15 @@ public class PrimaryReplicaSyncer extends AbstractComponent { } } - if (!operations.isEmpty()) { + final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO; + // have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least + if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { task.setPhase("sending_ops"); - ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY)); + ResyncReplicationRequest request = + new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY)); logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get()); + firstMessage.set(false); syncAction.sync(request, task, primaryAllocationId, primaryTerm, this); } else if (closed.compareAndSet(false, true)) { logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get()); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index 32aef840b6f..21a400f9f65 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -45,14 +45,29 @@ final class Checkpoint { final long maxSeqNo; final long globalCheckpoint; final long minTranslogGeneration; + final long trimmedAboveSeqNo; private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before - private static final int CURRENT_VERSION = 2; // introduction of global checkpoints + private static final int VERSION_6_0_0 = 2; // introduction of global checkpoints + private static final int CURRENT_VERSION = 3; // introduction of trimmed above seq# private static final String CHECKPOINT_CODEC = "ckp"; + // size of 7.0.0 checkpoint + + static final int V3_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC) + + Integer.BYTES // ops + + Long.BYTES // offset + + Long.BYTES // generation + + Long.BYTES // minimum sequence number, introduced in 6.0.0 + + Long.BYTES // maximum sequence number, introduced in 6.0.0 + + Long.BYTES // global checkpoint, introduced in 6.0.0 + + Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0 + + Long.BYTES // maximum reachable (trimmed) sequence number, introduced in 6.4.0 + + CodecUtil.footerLength(); + // size of 6.0.0 checkpoint - static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC) + static final int V2_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC) + Integer.BYTES // ops + Long.BYTES // offset + Long.BYTES // generation @@ -72,16 +87,20 @@ final class Checkpoint { /** * Create a new translog checkpoint. * - * @param offset the current offset in the translog - * @param numOps the current number of operations in the translog - * @param generation the current translog generation - * @param minSeqNo the current minimum sequence number of all operations in the translog - * @param maxSeqNo the current maximum sequence number of all operations in the translog - * @param globalCheckpoint the last-known global checkpoint + * @param offset the current offset in the translog + * @param numOps the current number of operations in the translog + * @param generation the current translog generation + * @param minSeqNo the current minimum sequence number of all operations in the translog + * @param maxSeqNo the current maximum sequence number of all operations in the translog + * @param globalCheckpoint the last-known global checkpoint * @param minTranslogGeneration the minimum generation referenced by the translog at this moment. + * @param trimmedAboveSeqNo all operations with seq# above trimmedAboveSeqNo should be ignored and not read from the + * corresponding translog file. {@link SequenceNumbers#UNASSIGNED_SEQ_NO} is used to disable trimming. */ - Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) { + Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, + long minTranslogGeneration, long trimmedAboveSeqNo) { assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]"; + assert trimmedAboveSeqNo <= maxSeqNo : "trimmedAboveSeqNo [" + trimmedAboveSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]"; assert minTranslogGeneration <= generation : "minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]"; this.offset = offset; @@ -91,6 +110,7 @@ final class Checkpoint { this.maxSeqNo = maxSeqNo; this.globalCheckpoint = globalCheckpoint; this.minTranslogGeneration = minTranslogGeneration; + this.trimmedAboveSeqNo = trimmedAboveSeqNo; } private void write(DataOutput out) throws IOException { @@ -101,26 +121,52 @@ final class Checkpoint { out.writeLong(maxSeqNo); out.writeLong(globalCheckpoint); out.writeLong(minTranslogGeneration); + out.writeLong(trimmedAboveSeqNo); } static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, long minTranslogGeneration) { final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); + final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo); + } + + static Checkpoint readCheckpointV6_4_0(final DataInput in) throws IOException { + final long offset = in.readLong(); + final int numOps = in.readInt(); + final long generation = in.readLong(); + final long minSeqNo = in.readLong(); + final long maxSeqNo = in.readLong(); + final long globalCheckpoint = in.readLong(); + final long minTranslogGeneration = in.readLong(); + final long trimmedAboveSeqNo = in.readLong(); + return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo); } static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); + final long offset = in.readLong(); + final int numOps = in.readInt(); + final long generation = in.readLong(); + final long minSeqNo = in.readLong(); + final long maxSeqNo = in.readLong(); + final long globalCheckpoint = in.readLong(); + final long minTranslogGeneration = in.readLong(); + final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo); } // reads a checksummed checkpoint introduced in ES 5.0.0 static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException { + final long offset = in.readLong(); + final int numOps = in.readInt(); + final long generation = in.readLong(); final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final long globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - final long minTranslogGeneration = -1L; - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); + final long minTranslogGeneration = -1; + final long trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo); } @Override @@ -133,6 +179,7 @@ final class Checkpoint { ", maxSeqNo=" + maxSeqNo + ", globalCheckpoint=" + globalCheckpoint + ", minTranslogGeneration=" + minTranslogGeneration + + ", trimmedAboveSeqNo=" + trimmedAboveSeqNo + '}'; } @@ -145,17 +192,20 @@ final class Checkpoint { if (fileVersion == INITIAL_VERSION) { assert indexInput.length() == V1_FILE_SIZE : indexInput.length(); return Checkpoint.readCheckpointV5_0_0(indexInput); + } else if (fileVersion == VERSION_6_0_0) { + assert indexInput.length() == V2_FILE_SIZE : indexInput.length(); + return Checkpoint.readCheckpointV6_0_0(indexInput); } else { assert fileVersion == CURRENT_VERSION : fileVersion; - assert indexInput.length() == FILE_SIZE : indexInput.length(); - return Checkpoint.readCheckpointV6_0_0(indexInput); + assert indexInput.length() == V3_FILE_SIZE : indexInput.length(); + return Checkpoint.readCheckpointV6_4_0(indexInput); } } } } public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException { - final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(FILE_SIZE) { + final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V3_FILE_SIZE) { @Override public synchronized byte[] toByteArray() { // don't clone @@ -164,13 +214,13 @@ final class Checkpoint { }; final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")"; try (OutputStreamIndexOutput indexOutput = - new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) { + new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, V3_FILE_SIZE)) { CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, CURRENT_VERSION); checkpoint.write(indexOutput); CodecUtil.writeFooter(indexOutput); - assert indexOutput.getFilePointer() == FILE_SIZE : - "get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + FILE_SIZE; + assert indexOutput.getFilePointer() == V3_FILE_SIZE : + "get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + V3_FILE_SIZE; assert indexOutput.getFilePointer() < 512 : "checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer(); @@ -196,7 +246,8 @@ final class Checkpoint { if (generation != that.generation) return false; if (minSeqNo != that.minSeqNo) return false; if (maxSeqNo != that.maxSeqNo) return false; - return globalCheckpoint == that.globalCheckpoint; + if (globalCheckpoint != that.globalCheckpoint) return false; + return trimmedAboveSeqNo == that.trimmedAboveSeqNo; } @Override @@ -207,6 +258,7 @@ final class Checkpoint { result = 31 * result + Long.hashCode(minSeqNo); result = 31 * result + Long.hashCode(maxSeqNo); result = 31 * result + Long.hashCode(globalCheckpoint); + result = 31 * result + Long.hashCode(trimmedAboveSeqNo); return result; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 1b095beddb4..b5a17534903 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -56,6 +56,15 @@ final class MultiSnapshot implements Translog.Snapshot { return totalOperations; } + @Override + public int skippedOperations() { + int skippedOperations = overriddenOperations; + for (TranslogSnapshot translog : translogs) { + skippedOperations += translog.skippedOperations(); + } + return skippedOperations; + } + @Override public int overriddenOperations() { return overriddenOperations; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 9d8c6c7c093..63055d933e4 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -696,6 +696,41 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return TRANSLOG_FILE_PREFIX + generation + CHECKPOINT_SUFFIX; } + /** + * Trims translog for terms of files below belowTerm and seq# above aboveSeqNo. + * Effectively it moves max visible seq# {@link Checkpoint#trimmedAboveSeqNo} therefore {@link TranslogSnapshot} skips those operations. + */ + public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { + assert aboveSeqNo >= SequenceNumbers.NO_OPS_PERFORMED : "aboveSeqNo has to a valid sequence number"; + + try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); + if (current.getPrimaryTerm() < belowTerm) { + throw new IllegalArgumentException("Trimming the translog can only be done for terms lower than the current one. " + + "Trim requested for term [ " + belowTerm + " ] , current is [ " + current.getPrimaryTerm() + " ]"); + } + // we assume that the current translog generation doesn't have trimmable ops. Verify that. + assert current.assertNoSeqAbove(belowTerm, aboveSeqNo); + // update all existed ones (if it is necessary) as checkpoint and reader are immutable + final List newReaders = new ArrayList<>(readers.size()); + try { + for (TranslogReader reader : readers) { + final TranslogReader newReader = + reader.getPrimaryTerm() < belowTerm + ? reader.closeIntoTrimmedReader(aboveSeqNo, getChannelFactory()) + : reader; + newReaders.add(newReader); + } + } catch (IOException e) { + IOUtils.closeWhileHandlingException(newReaders); + close(); + throw e; + } + + this.readers.clear(); + this.readers.addAll(newReaders); + } + } /** * Ensures that the given location has be synced / written to the underlying storage. @@ -845,6 +880,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ int totalOperations(); + /** + * The number of operations have been skipped (overridden or trimmed) in the snapshot so far. + */ + default int skippedOperations() { + return 0; + } + /** * The number of operations have been overridden (eg. superseded) in the snapshot so far. * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 29e30bd25dd..4091fa45762 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -21,6 +21,8 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.Closeable; import java.io.EOFException; @@ -28,8 +30,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.index.translog.Translog.getCommitCheckpointFileName; + /** * an immutable translog filereader */ @@ -70,6 +75,39 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { return new TranslogReader(checkpoint, channel, path, header); } + /** + * Closes current reader and creates new one with new checkoint and same file channel + */ + TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFactory) throws IOException { + if (closed.compareAndSet(false, true)) { + Closeable toCloseOnFailure = channel; + final TranslogReader newReader; + try { + if (aboveSeqNo < checkpoint.trimmedAboveSeqNo + || aboveSeqNo < checkpoint.maxSeqNo && checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + final Path checkpointFile = path.getParent().resolve(getCommitCheckpointFileName(checkpoint.generation)); + final Checkpoint newCheckpoint = new Checkpoint(checkpoint.offset, checkpoint.numOps, + checkpoint.generation, checkpoint.minSeqNo, checkpoint.maxSeqNo, + checkpoint.globalCheckpoint, checkpoint.minTranslogGeneration, aboveSeqNo); + Checkpoint.write(channelFactory, checkpointFile, newCheckpoint, StandardOpenOption.WRITE); + + IOUtils.fsync(checkpointFile, false); + IOUtils.fsync(checkpointFile.getParent(), true); + + newReader = new TranslogReader(newCheckpoint, channel, path, header); + } else { + newReader = new TranslogReader(checkpoint, channel, path, header); + } + toCloseOnFailure = null; + return newReader; + } finally { + IOUtils.close(toCloseOnFailure); + } + } else { + throw new AlreadyClosedException(toString() + " is already closed"); + } + } + public long sizeInBytes() { return length; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index a9667203532..8fe92bba009 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.translog; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.EOFException; import java.io.IOException; @@ -32,6 +33,7 @@ final class TranslogSnapshot extends BaseTranslogReader { private final ByteBuffer reusableBuffer; private long position; + private int skippedOperations; private int readOperations; private BufferedChecksumStreamInput reuse; @@ -54,17 +56,24 @@ final class TranslogSnapshot extends BaseTranslogReader { return totalOperations; } + int skippedOperations(){ + return skippedOperations; + } + @Override Checkpoint getCheckpoint() { return checkpoint; } public Translog.Operation next() throws IOException { - if (readOperations < totalOperations) { - return readOperation(); - } else { - return null; + while (readOperations < totalOperations) { + final Translog.Operation operation = readOperation(); + if (operation.seqNo() <= checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + return operation; + } + skippedOperations++; } + return null; } protected Translog.Operation readOperation() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index cae65788865..b89b21c5258 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.translog; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.OutputStreamDataOutput; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.common.bytes.BytesArray; @@ -92,6 +91,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { this.minSeqNo = initialCheckpoint.minSeqNo; assert initialCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; this.maxSeqNo = initialCheckpoint.maxSeqNo; + assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; } @@ -213,6 +213,25 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { return true; } + synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { + seenSequenceNumbers.entrySet().stream().filter(e -> e.getKey().longValue() > aboveSeqNo) + .forEach(e -> { + final Translog.Operation op; + try { + op = Translog.readOperation(new BufferedChecksumStreamInput(e.getValue().v1().streamInput())); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + long seqNo = op.seqNo(); + long primaryTerm = op.primaryTerm(); + if (primaryTerm < belowTerm) { + throw new AssertionError("current should not have any operations with seq#:primaryTerm [" + + seqNo + ":" + primaryTerm + "] > " + aboveSeqNo + ":" + belowTerm); + } + }); + return true; + } + /** * write all buffered ops to disk and fsync file. * @@ -241,7 +260,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { @Override synchronized Checkpoint getCheckpoint() { return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo, - globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong()); + globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong(), + SequenceNumbers.UNASSIGNED_SEQ_NO); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4c543aeeb22..72a6fcb6ba3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -615,9 +615,9 @@ public class RecoverySourceHandler { cancellableThreads.executeIO(sendBatch); } - assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps + assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", - expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); + expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps); if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { throw new IllegalStateException("translog replay failed to cover required sequence numbers" + diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index d5ad3941a5e..914c2b87422 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -40,7 +40,7 @@ public class ResyncReplicationRequestTests extends ESTestCase { final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); - final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index}); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index}); final BytesStreamOutput out = new BytesStreamOutput(); before.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 736dc40e686..018548be962 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -340,9 +340,10 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase op1 = snapshot.next(); assertThat(op1, notNullValue()); assertThat(snapshot.next(), nullValue()); - assertThat(snapshot.overriddenOperations(), equalTo(0)); + assertThat(snapshot.skippedOperations(), equalTo(0)); } - // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). + // Make sure that replica2 receives translog ops (eg. op2) from replica1 + // and does not overwrite its stale operation (op1) as it is trimmed. logger.info("--> Promote replica1 as the primary"); shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); @@ -353,7 +354,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase assertThat(op2.seqNo(), equalTo(op1.seqNo())); assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.overriddenOperations(), equalTo(1)); + assertThat(snapshot.overriddenOperations(), equalTo(0)); + assertThat(snapshot.skippedOperations(), equalTo(1)); } // Make sure that peer-recovery transfers all but non-overridden operations. @@ -366,7 +368,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); + assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0)); } // TODO: We should assert the content of shards in the ReplicationGroup. // Without rollback replicas(current implementation), we don't have the same content across shards: diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 21be1da3845..ee97ba14fe0 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -53,8 +53,12 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,6 +69,7 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -353,10 +358,19 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE") public void testResyncAfterPrimaryPromotion() throws Exception { - // TODO: check translog trimming functionality once it's implemented - try (ReplicationGroup shards = createGroup(2)) { + // TODO: check translog trimming functionality once rollback is implemented in Lucene (ES trimming is done) + Map mappings = + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); - int initialDocs = shards.indexDocs(randomInt(10)); + int initialDocs = randomInt(10); + + for (int i = 0; i < initialDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "initial_doc_" + i) + .source("{ \"f\": \"normal\"}", XContentType.JSON); + shards.index(indexRequest); + } + boolean syncedGlobalCheckPoint = randomBoolean(); if (syncedGlobalCheckPoint) { shards.syncGlobalCheckpoint(); @@ -364,16 +378,30 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); + final IndexShard justReplica = shards.getReplicas().get(1); // simulate docs that were inflight when primary failed - final int extraDocs = randomIntBetween(0, 5); + final int extraDocs = randomInt(5); logger.info("--> indexing {} extra docs", extraDocs); for (int i = 0; i < extraDocs; i++) { - final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i) - .source("{}", XContentType.JSON); + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_doc_" + i) + .source("{ \"f\": \"normal\"}", XContentType.JSON); final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); indexOnReplica(bulkShardRequest, shards, newPrimary); } + + final int extraDocsToBeTrimmed = randomIntBetween(0, 10); + logger.info("--> indexing {} extra docs to be trimmed", extraDocsToBeTrimmed); + for (int i = 0; i < extraDocsToBeTrimmed; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_trimmed_" + i) + .source("{ \"f\": \"trimmed\"}", XContentType.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + // have to replicate to another replica != newPrimary one - the subject to trim + indexOnReplica(bulkShardRequest, shards, justReplica); + } + + logger.info("--> seqNo primary {} replica {}", oldPrimary.seqNoStats(), newPrimary.seqNoStats()); + logger.info("--> resyncing replicas"); PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get(); if (syncedGlobalCheckPoint) { @@ -381,7 +409,36 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } else { assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); } - shards.assertAllEqual(initialDocs + extraDocs); + List replicas = shards.getReplicas(); + + // check all docs on primary are available on replica + Set primaryIds = getShardDocUIDs(newPrimary); + assertThat(primaryIds.size(), equalTo(initialDocs + extraDocs)); + for (IndexShard replica : replicas) { + Set replicaIds = getShardDocUIDs(replica); + Set temp = new HashSet<>(primaryIds); + temp.removeAll(replicaIds); + assertThat(replica.routingEntry() + " is missing docs", temp, empty()); + temp = new HashSet<>(replicaIds); + temp.removeAll(primaryIds); + // yeah, replica has more docs as there is no Lucene roll back on it + assertThat(replica.routingEntry() + " has to have extra docs", temp, + extraDocsToBeTrimmed > 0 ? not(empty()) : empty()); + } + + // check translog on replica is trimmed + int translogOperations = 0; + try(Translog.Snapshot snapshot = getTranslog(justReplica).newSnapshot()) { + Translog.Operation next; + while ((next = snapshot.next()) != null) { + translogOperations++; + assertThat("unexpected op: " + next, (int)next.seqNo(), lessThan(initialDocs + extraDocs)); + assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPrimaryTerm())); + final Translog.Source source = next.getSource(); + assertThat(source.source.utf8ToString(), is("{ \"f\": \"normal\"}")); + } + } + assertThat(translogOperations, is(initialDocs + extraDocs)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 1257aea3d14..b290f4d4559 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -36,15 +37,20 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -53,15 +59,17 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { IndexShard shard = newStartedShard(true); TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); + List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); + resyncRequests.add(request); assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, taskManager, syncAction); - syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 100))); + syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); int numDocs = randomInt(10); for (int i = 0; i < numDocs; i++) { @@ -72,7 +80,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0; - boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1; + boolean syncNeeded = numDocs > 0; String allocationId = shard.routingEntry().allocationId().getId(); shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), @@ -84,19 +92,29 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { PlainActionFuture fut = new PlainActionFuture<>(); syncer.resync(shard, fut); - fut.get(); + PrimaryReplicaSyncer.ResyncTask resyncTask = fut.get(); if (syncNeeded) { assertTrue("Sync action was not called", syncActionCalled.get()); + ResyncReplicationRequest resyncRequest = resyncRequests.remove(0); + assertThat(resyncRequest.getTrimAboveSeqNo(), equalTo(numDocs - 1L)); + + assertThat("trimAboveSeqNo has to be specified in request #0 only", resyncRequests.stream() + .mapToLong(ResyncReplicationRequest::getTrimAboveSeqNo) + .filter(seqNo -> seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) + .findFirst() + .isPresent(), + is(false)); } - assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, fut.get().getTotalOperations()); - if (syncNeeded) { + + assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, resyncTask.getTotalOperations()); + if (syncNeeded && globalCheckPoint < numDocs - 1) { long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included - assertEquals(skippedOps, fut.get().getSkippedOperations()); - assertEquals(numDocs - skippedOps, fut.get().getResyncedOperations()); + assertEquals(skippedOps, resyncTask.getSkippedOperations()); + assertEquals(numDocs - skippedOps, resyncTask.getResyncedOperations()); } else { - assertEquals(0, fut.get().getSkippedOperations()); - assertEquals(0, fut.get().getResyncedOperations()); + assertEquals(0, resyncTask.getSkippedOperations()); + assertEquals(0, resyncTask.getResyncedOperations()); } closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java index 0dc404767de..99e21d47604 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java @@ -118,7 +118,8 @@ public class TranslogHeaderTests extends ESTestCase { assertThat("test file [" + translogFile + "] should exist", Files.exists(translogFile), equalTo(true)); final E error = expectThrows(expectedErrorType, () -> { final Checkpoint checkpoint = new Checkpoint(Files.size(translogFile), 1, 1, - SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, 1); + SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, 1, SequenceNumbers.NO_OPS_PERFORMED); try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { TranslogReader.open(channel, translogFile, checkpoint, null); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b3b9fca886e..cf6e7536846 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -107,7 +107,9 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -120,8 +122,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.stub; @@ -1474,8 +1479,8 @@ public class TranslogTests extends ESTestCase { fail("corrupted"); } catch (IllegalStateException ex) { assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " + - "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + - "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0}", ex.getMessage()); + "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " + + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { @@ -1507,6 +1512,191 @@ public class TranslogTests extends ESTestCase { assertEquals(ops, readOperations); } + public void testSnapshotCurrentHasUnexpectedOperationsForTrimmedOperations() throws Exception { + int extraDocs = randomIntBetween(10, 15); + + // increment primaryTerm to avoid potential negative numbers + primaryTerm.addAndGet(extraDocs); + translog.rollGeneration(); + + for (int op = 0; op < extraDocs; op++) { + String ascii = randomAlphaOfLengthBetween(1, 50); + Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get() - op, + ascii.getBytes("UTF-8")); + translog.add(operation); + } + + AssertionError error = expectThrows(AssertionError.class, () -> translog.trimOperations(primaryTerm.get(), 0)); + assertThat(error.getMessage(), is("current should not have any operations with seq#:primaryTerm " + + "[1:" + (primaryTerm.get() - 1) + "] > 0:" + primaryTerm.get())); + + primaryTerm.incrementAndGet(); + translog.rollGeneration(); + + // add a single operation to current with seq# > trimmed seq# but higher primary term + Translog.Index operation = new Translog.Index("test", "" + 1, 1L, primaryTerm.get(), + randomAlphaOfLengthBetween(1, 50).getBytes("UTF-8")); + translog.add(operation); + + // it is possible to trim after generation rollover + translog.trimOperations(primaryTerm.get(), 0); + } + + public void testSnapshotTrimmedOperations() throws Exception { + final InMemoryTranslog inMemoryTranslog = new InMemoryTranslog(); + final List allOperations = new ArrayList<>(); + + for(int attempt = 0, maxAttempts = randomIntBetween(3, 10); attempt < maxAttempts; attempt++) { + List ops = LongStream.range(0, allOperations.size() + randomIntBetween(10, 15)) + .boxed().collect(Collectors.toList()); + Randomness.shuffle(ops); + + AtomicReference source = new AtomicReference<>(); + for (final long op : ops) { + source.set(randomAlphaOfLengthBetween(1, 50)); + + // have to use exactly the same source for same seq# if primaryTerm is not changed + if (primaryTerm.get() == translog.getCurrent().getPrimaryTerm()) { + // use the latest source of op with the same seq# - therefore no break + allOperations + .stream() + .filter(allOp -> allOp instanceof Translog.Index && allOp.seqNo() == op) + .map(allOp -> ((Translog.Index)allOp).source().utf8ToString()) + .reduce((a, b) -> b) + .ifPresent(source::set); + } + + // use ongoing primaryTerms - or the same as it was + Translog.Index operation = new Translog.Index("test", "" + op, op, primaryTerm.get(), + source.get().getBytes("UTF-8")); + translog.add(operation); + inMemoryTranslog.add(operation); + allOperations.add(operation); + } + + if (randomBoolean()) { + primaryTerm.incrementAndGet(); + translog.rollGeneration(); + } + + long maxTrimmedSeqNo = randomInt(allOperations.size()); + + translog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + inMemoryTranslog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + translog.sync(); + + Collection effectiveOperations = inMemoryTranslog.operations(); + + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(effectiveOperations)); + assertThat(snapshot.totalOperations(), is(allOperations.size())); + assertThat(snapshot.skippedOperations(), is(allOperations.size() - effectiveOperations.size())); + } + } + } + + /** + * this class mimic behaviour of original {@link Translog} + */ + static class InMemoryTranslog { + private final Map operations = new HashMap<>(); + + void add(Translog.Operation operation) { + final Translog.Operation old = operations.put(operation.seqNo(), operation); + assert old == null || old.primaryTerm() <= operation.primaryTerm(); + } + + void trimOperations(long belowTerm, long aboveSeqNo) { + for (final Iterator> it = operations.entrySet().iterator(); it.hasNext(); ) { + final Map.Entry next = it.next(); + Translog.Operation op = next.getValue(); + boolean drop = op.primaryTerm() < belowTerm && op.seqNo() > aboveSeqNo; + if (drop) { + it.remove(); + } + } + } + + Collection operations() { + return operations.values(); + } + } + + public void testRandomExceptionsOnTrimOperations( ) throws Exception { + Path tempDir = createTempDir(); + final FailSwitch fail = new FailSwitch(); + fail.failNever(); + TranslogConfig config = getTranslogConfig(tempDir); + List fileChannels = new ArrayList<>(); + final Translog failableTLog = + getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy(), fileChannels); + + IOException expectedException = null; + int translogOperations = 0; + final int maxAttempts = 10; + for(int attempt = 0; attempt < maxAttempts; attempt++) { + int maxTrimmedSeqNo; + fail.failNever(); + int extraTranslogOperations = randomIntBetween(10, 100); + + List ops = IntStream.range(translogOperations, translogOperations + extraTranslogOperations) + .boxed().collect(Collectors.toList()); + Randomness.shuffle(ops); + for (int op : ops) { + String ascii = randomAlphaOfLengthBetween(1, 50); + Translog.Index operation = new Translog.Index("test", "" + op, op, + primaryTerm.get(), ascii.getBytes("UTF-8")); + + failableTLog.add(operation); + } + + translogOperations += extraTranslogOperations; + + // at least one roll + inc of primary term has to be there - otherwise trim would not take place at all + // last attempt we have to make roll as well - otherwise could skip trimming as it has been trimmed already + boolean rollover = attempt == 0 || attempt == maxAttempts - 1 || randomBoolean(); + if (rollover) { + primaryTerm.incrementAndGet(); + failableTLog.rollGeneration(); + } + + maxTrimmedSeqNo = rollover ? translogOperations - randomIntBetween(4, 8) : translogOperations + 1; + + // if we are so happy to reach the max attempts - fail it always` + fail.failRate(attempt < maxAttempts - 1 ? 25 : 100); + try { + failableTLog.trimOperations(primaryTerm.get(), maxTrimmedSeqNo); + } catch (IOException e){ + expectedException = e; + break; + } + } + + assertThat(expectedException, is(not(nullValue()))); + + assertThat(fileChannels, is(not(empty()))); + assertThat("all file channels have to be closed", + fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false)); + + assertThat(failableTLog.isOpen(), is(false)); + final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, () -> failableTLog.newSnapshot()); + assertThat(alreadyClosedException.getMessage(), + is("translog is already closed")); + + fail.failNever(); + + // check that despite of IO exception translog is not corrupted + try(Translog reopenedTranslog = openTranslog(config, failableTLog.getTranslogUUID())) { + try (Translog.Snapshot snapshot = reopenedTranslog.newSnapshot()) { + assertThat(snapshot.totalOperations(), greaterThan(0)); + Translog.Operation operation; + for (int i = 0; (operation = snapshot.next()) != null; i++) { + assertNotNull("operation " + i + " must be non-null", operation); + } + } + } + } + public void testLocationHashCodeEquals() throws IOException { List locations = new ArrayList<>(); List locations2 = new ArrayList<>(); @@ -2007,7 +2197,8 @@ public class TranslogTests extends ESTestCase { private volatile boolean onceFailedFailAlways = false; public boolean fail() { - boolean fail = randomIntBetween(1, 100) <= failRate; + final int rnd = randomIntBetween(1, 100); + boolean fail = rnd <= failRate; if (fail && onceFailedFailAlways) { failAlways(); } @@ -2026,17 +2217,30 @@ public class TranslogTests extends ESTestCase { failRate = randomIntBetween(1, 100); } + public void failRate(int rate) { + failRate = rate; + } + public void onceFailedFailAlways() { onceFailedFailAlways = true; } } - private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites, final boolean throwUnknownException, String translogUUID, final TranslogDeletionPolicy deletionPolicy) throws IOException { + return getFailableTranslog(fail, config, partialWrites, throwUnknownException, translogUUID, deletionPolicy, null); + } + + private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites, + final boolean throwUnknownException, String translogUUID, + final TranslogDeletionPolicy deletionPolicy, + final List fileChannels) throws IOException { final ChannelFactory channelFactory = (file, openOption) -> { FileChannel channel = FileChannel.open(file, openOption); + if (fileChannels != null) { + fileChannels.add(channel); + } boolean success = false; try { final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation @@ -2393,7 +2597,7 @@ public class TranslogTests extends ESTestCase { } final long generation = randomNonNegativeLong(); return new Checkpoint(randomLong(), randomInt(), generation, minSeqNo, maxSeqNo, randomNonNegativeLong(), - randomLongBetween(1, generation)); + randomLongBetween(1, generation), maxSeqNo); } public void testCheckpointOnDiskFull() throws IOException { @@ -2617,7 +2821,7 @@ public class TranslogTests extends ESTestCase { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } - readFromSnapshot += snapshot.overriddenOperations(); + readFromSnapshot += snapshot.skippedOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo;