diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 668633e07ef..cc9dbdeb63f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,6 +19,12 @@ package org.elasticsearch.index.translog; +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongObjectHashMap; +import com.carrotsearch.hppc.LongSet; +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.index.seqno.SequenceNumbers; + import java.io.Closeable; import java.io.IOException; import java.util.Arrays; @@ -30,32 +36,44 @@ final class MultiSnapshot implements Translog.Snapshot { private final TranslogSnapshot[] translogs; private final int totalOperations; + private int overriddenOperations; private final Closeable onClose; private int index; + private final SeqNoSet seenSeqNo; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { this.translogs = translogs; - totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.overriddenOperations = 0; this.onClose = onClose; - index = 0; + this.seenSeqNo = new SeqNoSet(); + this.index = translogs.length - 1; } - @Override public int totalOperations() { return totalOperations; } + @Override + public int overriddenOperations() { + return overriddenOperations; + } + @Override public Translog.Operation next() throws IOException { - for (; index < translogs.length; index++) { + for (; index >= 0; index--) { final TranslogSnapshot current = translogs[index]; - Translog.Operation op = current.next(); - if (op != null) { // if we are null we move to the next snapshot - return op; + Translog.Operation op; + while ((op = current.next()) != null) { + if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { + return op; + } else { + overriddenOperations++; + } } } return null; @@ -65,4 +83,76 @@ final class MultiSnapshot implements Translog.Snapshot { public void close() throws IOException { onClose.close(); } + + /** + * A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1). + */ + private static final class CountedBitSet { + private short onBits; + private final FixedBitSet bitset; + + CountedBitSet(short numBits) { + assert numBits > 0; + this.onBits = 0; + this.bitset = new FixedBitSet(numBits); + } + + boolean getAndSet(int index) { + assert index >= 0; + boolean wasOn = bitset.getAndSet(index); + if (wasOn == false) { + onBits++; + } + return wasOn; + } + + boolean hasAllBitsOn() { + return onBits == bitset.length(); + } + } + + /** + * Sequence numbers from translog are likely to form contiguous ranges, + * thus collapsing a completed bitset into a single entry will reduce memory usage. + */ + static final class SeqNoSet { + static final short BIT_SET_SIZE = 1024; + private final LongSet completedSets = new LongHashSet(); + private final LongObjectHashMap ongoingSets = new LongObjectHashMap<>(); + + /** + * Marks this sequence number and returns true if it is seen before. + */ + boolean getAndSet(long value) { + assert value >= 0; + final long key = value / BIT_SET_SIZE; + + if (completedSets.contains(key)) { + return true; + } + + CountedBitSet bitset = ongoingSets.get(key); + if (bitset == null) { + bitset = new CountedBitSet(BIT_SET_SIZE); + ongoingSets.put(key, bitset); + } + + final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE)); + if (bitset.hasAllBitsOn()) { + ongoingSets.remove(key); + completedSets.add(key); + } + return wasOn; + } + + // For testing + long completeSetsSize() { + return completedSets.size(); + } + + // For testing + long ongoingSetsSize() { + return ongoingSets.size(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4373c8d0539..80033833899 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -831,10 +831,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public interface Snapshot extends Closeable { /** - * The total number of operations in the translog. + * The total estimated number of operations in the snapshot. */ int totalOperations(); + /** + * The number of operations have been overridden (eg. superseded) in the snapshot so far. + * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation + * with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. + */ + default int overriddenOperations() { + return 0; + } + /** * Returns the next operation in the snapshot or null if we reached the end. */ diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5f692d8e8f5..71ad21c14d7 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -66,6 +66,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -567,8 +568,9 @@ public class RecoverySourceHandler { cancellableThreads.executeIO(sendBatch); } - assert expectedTotalOps == skippedOps + totalSentOps - : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; + assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps + : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", + expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cf4dab733f2..8c15a2a84dd 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -46,16 +46,23 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.hamcrest.Matcher; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { @@ -299,6 +306,68 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase } } + public void testSeqNoCollision() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + int initDocs = shards.indexDocs(randomInt(10)); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); + IndexShard replica2 = replicas.get(1); + shards.syncGlobalCheckpoint(); + + logger.info("--> Isolate replica1"); + IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); + indexOnReplica(replicationRequest, replica2); + + final Translog.Operation op1; + final List initOperations = new ArrayList<>(initDocs); + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + for (int i = 0; i < initDocs; i++) { + Translog.Operation op = snapshot.next(); + assertThat(op, is(notNullValue())); + initOperations.add(op); + } + op1 = snapshot.next(); + assertThat(op1, notNullValue()); + assertThat(snapshot.next(), nullValue()); + assertThat(snapshot.overriddenOperations(), equalTo(0)); + } + // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). + logger.info("--> Promote replica1 as the primary"); + shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. + shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); + final Translog.Operation op2; + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); + op2 = snapshot.next(); + assertThat(op2.seqNo(), equalTo(op1.seqNo())); + assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.overriddenOperations(), equalTo(1)); + } + + // Make sure that peer-recovery transfers all but non-overridden operations. + IndexShard replica3 = shards.addReplica(); + logger.info("--> Promote replica2 as the primary"); + shards.promoteReplicaToPrimary(replica2); + logger.info("--> Recover replica3 from replica2"); + recoverReplica(replica3, replica2); + try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + assertThat(snapshot.next(), equalTo(op2)); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); + } + // TODO: We should assert the content of shards in the ReplicationGroup. + // Without rollback replicas(current implementation), we don't have the same content across shards: + // - replica1 has {doc1} + // - replica2 has {doc1, doc2} + // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage; diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java new file mode 100644 index 00000000000..7ee2a6c3366 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongSet; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class MultiSnapshotTests extends ESTestCase { + + public void testTrackSeqNoSimpleRange() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final List values = LongStream.range(0, 1024).boxed().collect(Collectors.toList()); + Randomness.shuffle(values); + for (int i = 0; i < 1023; i++) { + assertThat(bitSet.getAndSet(values.get(i)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(1L)); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + } + + assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + assertThat(bitSet.completeSetsSize(), equalTo(1L)); + + assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true)); + assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false)); + } + + public void testTrackSeqNoDenseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { + long seq = between(0, 5000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L)); + }); + } + + public void testTrackSeqNoSparseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { + long seq = between(i * 10_000, i * 30_000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + }); + } + + public void testTrackSeqNoMimicTranslogRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); + final LongSet normalSet = new LongHashSet(); + long currentSeq = between(10_000_000, 1_000_000_000); + final int iterations = scaledRandomIntBetween(100, 2000); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + long totalDocs = 0; + for (long i = 0; i < iterations; i++) { + int batchSize = between(1, 1500); + totalDocs += batchSize; + currentSeq -= batchSize; + List batch = LongStream.range(currentSeq, currentSeq + batchSize) + .boxed() + .collect(Collectors.toList()); + Randomness.shuffle(batch); + batch.forEach(seq -> { + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L)); + }); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); + } + assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index c45da660b00..4ca6057bd6b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -26,6 +26,9 @@ import org.hamcrest.TypeSafeMatcher; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; public final class SnapshotMatchers { @@ -50,10 +53,14 @@ public final class SnapshotMatchers { /** * Consumes a snapshot and make sure it's content is as expected */ - public static Matcher equalsTo(ArrayList ops) { + public static Matcher equalsTo(List ops) { return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()])); } + public static Matcher containsOperationsInAnyOrder(Collection expectedOperations) { + return new ContainingInAnyOrderMatcher(expectedOperations); + } + public static class SizeMatcher extends TypeSafeMatcher { private final int size; @@ -127,5 +134,60 @@ public final class SnapshotMatchers { } } + public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher { + private final Collection expectedOps; + private List notFoundOps; + private List notExpectedOps; + static List drainAll(Translog.Snapshot snapshot) throws IOException { + final List actualOps = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + actualOps.add(op); + } + return actualOps; + } + + public ContainingInAnyOrderMatcher(Collection expectedOps) { + this.expectedOps = expectedOps; + } + + @Override + protected boolean matchesSafely(Translog.Snapshot snapshot) { + try { + List actualOps = drainAll(snapshot); + notFoundOps = expectedOps.stream() + .filter(o -> actualOps.contains(o) == false) + .collect(Collectors.toList()); + notExpectedOps = actualOps.stream() + .filter(o -> expectedOps.contains(o) == false) + .collect(Collectors.toList()); + return notFoundOps.isEmpty() && notExpectedOps.isEmpty(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to read snapshot content", ex); + } + } + + @Override + protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription + .appendText("not found ").appendValueList("[", ", ", "]", notFoundOps); + } + if (notExpectedOps.isEmpty() == false) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription.appendText("; "); + } + mismatchDescription + .appendText("not expected ").appendValueList("[", ", ", "]", notExpectedOps); + } + } + + @Override + public void describeTo(Description description) { + description.appendText("snapshot contains ") + .appendValueList("[", ", ", "]", expectedOps) + .appendText(" in any order."); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1a17e0dc6a0..593e1059215 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -83,10 +83,13 @@ import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,6 +110,7 @@ import java.util.stream.LongStream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -217,7 +221,7 @@ public class TranslogTests extends ESTestCase { return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - private void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { + private void addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { list.add(op); translog.add(op); } @@ -520,7 +524,7 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); markCurrentGenAsCommitted(translog); - assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { IOUtils.closeWhileHandlingException(toClose); @@ -1028,7 +1032,7 @@ public class TranslogTests extends ESTestCase { } assertEquals(max.generation, translog.currentFileGeneration()); - try (Translog.Snapshot snap = translog.newSnapshot()) { + try (Translog.Snapshot snap = new SortedSnapshot(translog.newSnapshot())) { Translog.Operation next; Translog.Operation maxOp = null; while ((next = snap.next()) != null) { @@ -1252,7 +1256,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1266,7 +1270,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1310,7 +1314,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1325,7 +1329,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1374,7 +1378,7 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -2061,7 +2065,7 @@ public class TranslogTests extends ESTestCase { } public void testRecoverWithUnbackedNextGen() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); @@ -2072,21 +2076,25 @@ public class TranslogTests extends ESTestCase { try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } - tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + + Translog.Operation op = snapshot.next(); + assertNotNull("operation 1 must be non-null", op); + assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString())); + + tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8")))); } + try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 2; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } + + Translog.Operation secondOp = snapshot.next(); + assertNotNull("operation 2 must be non-null", secondOp); + assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2); + + Translog.Operation firstOp = snapshot.next(); + assertNotNull("operation 1 must be non-null", firstOp); + assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1); } } @@ -2489,6 +2497,7 @@ public class TranslogTests extends ESTestCase { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } + readFromSnapshot += snapshot.overriddenOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo; @@ -2534,4 +2543,84 @@ public class TranslogTests extends ESTestCase { } } } + + public void testSnapshotReadOperationInReverse() throws Exception { + final Deque> views = new ArrayDeque<>(); + views.push(new ArrayList<>()); + final AtomicLong seqNo = new AtomicLong(); + + final int generations = randomIntBetween(2, 20); + for (int gen = 0; gen < generations; gen++) { + final int operations = randomIntBetween(1, 100); + for (int i = 0; i < operations; i++) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1}); + translog.add(op); + views.peek().add(op); + } + if (frequently()) { + translog.rollGeneration(); + views.push(new ArrayList<>()); + } + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + final List expectedSeqNo = new ArrayList<>(); + while (views.isEmpty() == false) { + expectedSeqNo.addAll(views.pop()); + } + assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo)); + } + } + + public void testSnapshotDedupOperations() throws Exception { + final Map latestOperations = new HashMap<>(); + final int generations = between(2, 20); + for (int gen = 0; gen < generations; gen++) { + List batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList()); + Randomness.shuffle(batch); + for (Long seqNo : batch) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1}); + translog.add(op); + latestOperations.put(op.seqNo(), op); + } + translog.rollGeneration(); + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values())); + } + } + + static class SortedSnapshot implements Translog.Snapshot { + private final Translog.Snapshot snapshot; + private List operations = null; + + SortedSnapshot(Translog.Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public Translog.Operation next() throws IOException { + if (operations == null) { + operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operations.add(op); + } + operations.sort(Comparator.comparing(Translog.Operation::seqNo)); + } + if (operations.isEmpty()) { + return null; + } + return operations.remove(0); + } + + @Override + public void close() throws IOException { + snapshot.close(); + } + } }