Dedup translog operations by reading in reverse (#27268)
Currently, translog operations are read and processed one by one. This may be a problem as stale operations in translogs may suddenly reappear in recoveries. To make sure that stale operations won't be processed, we read the translog files in a reverse order (eg. from the most recent file to the oldest file) and only process an operation if its sequence number was not seen before. Relates to #10708
This commit is contained in:
parent
0519fa223c
commit
a4b4e14186
|
@ -19,6 +19,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.translog;
|
package org.elasticsearch.index.translog;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.LongHashSet;
|
||||||
|
import com.carrotsearch.hppc.LongObjectHashMap;
|
||||||
|
import com.carrotsearch.hppc.LongSet;
|
||||||
|
import org.apache.lucene.util.FixedBitSet;
|
||||||
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -30,32 +36,44 @@ final class MultiSnapshot implements Translog.Snapshot {
|
||||||
|
|
||||||
private final TranslogSnapshot[] translogs;
|
private final TranslogSnapshot[] translogs;
|
||||||
private final int totalOperations;
|
private final int totalOperations;
|
||||||
|
private int overriddenOperations;
|
||||||
private final Closeable onClose;
|
private final Closeable onClose;
|
||||||
private int index;
|
private int index;
|
||||||
|
private final SeqNoSet seenSeqNo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
|
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
|
||||||
*/
|
*/
|
||||||
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
|
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
|
||||||
this.translogs = translogs;
|
this.translogs = translogs;
|
||||||
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
|
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
|
||||||
|
this.overriddenOperations = 0;
|
||||||
this.onClose = onClose;
|
this.onClose = onClose;
|
||||||
index = 0;
|
this.seenSeqNo = new SeqNoSet();
|
||||||
|
this.index = translogs.length - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int totalOperations() {
|
public int totalOperations() {
|
||||||
return totalOperations;
|
return totalOperations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int overriddenOperations() {
|
||||||
|
return overriddenOperations;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Translog.Operation next() throws IOException {
|
public Translog.Operation next() throws IOException {
|
||||||
for (; index < translogs.length; index++) {
|
for (; index >= 0; index--) {
|
||||||
final TranslogSnapshot current = translogs[index];
|
final TranslogSnapshot current = translogs[index];
|
||||||
Translog.Operation op = current.next();
|
Translog.Operation op;
|
||||||
if (op != null) { // if we are null we move to the next snapshot
|
while ((op = current.next()) != null) {
|
||||||
return op;
|
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
|
||||||
|
return op;
|
||||||
|
} else {
|
||||||
|
overriddenOperations++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -65,4 +83,76 @@ final class MultiSnapshot implements Translog.Snapshot {
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
onClose.close();
|
onClose.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
|
||||||
|
*/
|
||||||
|
private static final class CountedBitSet {
|
||||||
|
private short onBits;
|
||||||
|
private final FixedBitSet bitset;
|
||||||
|
|
||||||
|
CountedBitSet(short numBits) {
|
||||||
|
assert numBits > 0;
|
||||||
|
this.onBits = 0;
|
||||||
|
this.bitset = new FixedBitSet(numBits);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean getAndSet(int index) {
|
||||||
|
assert index >= 0;
|
||||||
|
boolean wasOn = bitset.getAndSet(index);
|
||||||
|
if (wasOn == false) {
|
||||||
|
onBits++;
|
||||||
|
}
|
||||||
|
return wasOn;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean hasAllBitsOn() {
|
||||||
|
return onBits == bitset.length();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sequence numbers from translog are likely to form contiguous ranges,
|
||||||
|
* thus collapsing a completed bitset into a single entry will reduce memory usage.
|
||||||
|
*/
|
||||||
|
static final class SeqNoSet {
|
||||||
|
static final short BIT_SET_SIZE = 1024;
|
||||||
|
private final LongSet completedSets = new LongHashSet();
|
||||||
|
private final LongObjectHashMap<CountedBitSet> ongoingSets = new LongObjectHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
|
||||||
|
*/
|
||||||
|
boolean getAndSet(long value) {
|
||||||
|
assert value >= 0;
|
||||||
|
final long key = value / BIT_SET_SIZE;
|
||||||
|
|
||||||
|
if (completedSets.contains(key)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
CountedBitSet bitset = ongoingSets.get(key);
|
||||||
|
if (bitset == null) {
|
||||||
|
bitset = new CountedBitSet(BIT_SET_SIZE);
|
||||||
|
ongoingSets.put(key, bitset);
|
||||||
|
}
|
||||||
|
|
||||||
|
final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
|
||||||
|
if (bitset.hasAllBitsOn()) {
|
||||||
|
ongoingSets.remove(key);
|
||||||
|
completedSets.add(key);
|
||||||
|
}
|
||||||
|
return wasOn;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For testing
|
||||||
|
long completeSetsSize() {
|
||||||
|
return completedSets.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
// For testing
|
||||||
|
long ongoingSetsSize() {
|
||||||
|
return ongoingSets.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -831,10 +831,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
public interface Snapshot extends Closeable {
|
public interface Snapshot extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The total number of operations in the translog.
|
* The total estimated number of operations in the snapshot.
|
||||||
*/
|
*/
|
||||||
int totalOperations();
|
int totalOperations();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of operations have been overridden (eg. superseded) in the snapshot so far.
|
||||||
|
* If two operations have the same sequence number, the operation with a lower term will be overridden by the operation
|
||||||
|
* with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
|
||||||
|
*/
|
||||||
|
default int overriddenOperations() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
|
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -66,6 +66,7 @@ import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -567,8 +568,9 @@ public class RecoverySourceHandler {
|
||||||
cancellableThreads.executeIO(sendBatch);
|
cancellableThreads.executeIO(sendBatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert expectedTotalOps == skippedOps + totalSentOps
|
assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
|
||||||
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]";
|
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
|
||||||
|
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
|
||||||
|
|
||||||
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
|
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
|
||||||
|
|
||||||
|
|
|
@ -46,16 +46,23 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.anyOf;
|
import static org.hamcrest.Matchers.anyOf;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
|
||||||
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
|
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
|
||||||
|
|
||||||
|
@ -299,6 +306,68 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSeqNoCollision() throws Exception {
|
||||||
|
try (ReplicationGroup shards = createGroup(2)) {
|
||||||
|
shards.startAll();
|
||||||
|
int initDocs = shards.indexDocs(randomInt(10));
|
||||||
|
List<IndexShard> replicas = shards.getReplicas();
|
||||||
|
IndexShard replica1 = replicas.get(0);
|
||||||
|
IndexShard replica2 = replicas.get(1);
|
||||||
|
shards.syncGlobalCheckpoint();
|
||||||
|
|
||||||
|
logger.info("--> Isolate replica1");
|
||||||
|
IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
|
||||||
|
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
|
||||||
|
indexOnReplica(replicationRequest, replica2);
|
||||||
|
|
||||||
|
final Translog.Operation op1;
|
||||||
|
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
|
||||||
|
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
|
||||||
|
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
|
||||||
|
for (int i = 0; i < initDocs; i++) {
|
||||||
|
Translog.Operation op = snapshot.next();
|
||||||
|
assertThat(op, is(notNullValue()));
|
||||||
|
initOperations.add(op);
|
||||||
|
}
|
||||||
|
op1 = snapshot.next();
|
||||||
|
assertThat(op1, notNullValue());
|
||||||
|
assertThat(snapshot.next(), nullValue());
|
||||||
|
assertThat(snapshot.overriddenOperations(), equalTo(0));
|
||||||
|
}
|
||||||
|
// Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1).
|
||||||
|
logger.info("--> Promote replica1 as the primary");
|
||||||
|
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
|
||||||
|
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
|
||||||
|
final Translog.Operation op2;
|
||||||
|
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
|
||||||
|
assertThat(snapshot.totalOperations(), equalTo(initDocs + 2));
|
||||||
|
op2 = snapshot.next();
|
||||||
|
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
|
||||||
|
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
|
||||||
|
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
|
||||||
|
assertThat(snapshot.overriddenOperations(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that peer-recovery transfers all but non-overridden operations.
|
||||||
|
IndexShard replica3 = shards.addReplica();
|
||||||
|
logger.info("--> Promote replica2 as the primary");
|
||||||
|
shards.promoteReplicaToPrimary(replica2);
|
||||||
|
logger.info("--> Recover replica3 from replica2");
|
||||||
|
recoverReplica(replica3, replica2);
|
||||||
|
try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) {
|
||||||
|
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
|
||||||
|
assertThat(snapshot.next(), equalTo(op2));
|
||||||
|
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
|
||||||
|
assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0));
|
||||||
|
}
|
||||||
|
// TODO: We should assert the content of shards in the ReplicationGroup.
|
||||||
|
// Without rollback replicas(current implementation), we don't have the same content across shards:
|
||||||
|
// - replica1 has {doc1}
|
||||||
|
// - replica2 has {doc1, doc2}
|
||||||
|
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Throws <code>documentFailure</code> on every indexing operation */
|
/** Throws <code>documentFailure</code> on every indexing operation */
|
||||||
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
|
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
|
||||||
final String documentFailureMessage;
|
final String documentFailureMessage;
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.translog;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.LongHashSet;
|
||||||
|
import com.carrotsearch.hppc.LongSet;
|
||||||
|
import org.elasticsearch.common.Randomness;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||||
|
|
||||||
|
public class MultiSnapshotTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testTrackSeqNoSimpleRange() throws Exception {
|
||||||
|
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
|
||||||
|
final List<Long> values = LongStream.range(0, 1024).boxed().collect(Collectors.toList());
|
||||||
|
Randomness.shuffle(values);
|
||||||
|
for (int i = 0; i < 1023; i++) {
|
||||||
|
assertThat(bitSet.getAndSet(values.get(i)), equalTo(false));
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), equalTo(1L));
|
||||||
|
assertThat(bitSet.completeSetsSize(), equalTo(0L));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false));
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
|
||||||
|
assertThat(bitSet.completeSetsSize(), equalTo(1L));
|
||||||
|
|
||||||
|
assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true));
|
||||||
|
assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTrackSeqNoDenseRanges() throws Exception {
|
||||||
|
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
|
||||||
|
final LongSet normalSet = new LongHashSet();
|
||||||
|
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
|
||||||
|
long seq = between(0, 5000);
|
||||||
|
boolean existed = normalSet.add(seq) == false;
|
||||||
|
assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
|
||||||
|
assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTrackSeqNoSparseRanges() throws Exception {
|
||||||
|
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
|
||||||
|
final LongSet normalSet = new LongHashSet();
|
||||||
|
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
|
||||||
|
long seq = between(i * 10_000, i * 30_000);
|
||||||
|
boolean existed = normalSet.add(seq) == false;
|
||||||
|
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTrackSeqNoMimicTranslogRanges() throws Exception {
|
||||||
|
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
|
||||||
|
final LongSet normalSet = new LongHashSet();
|
||||||
|
long currentSeq = between(10_000_000, 1_000_000_000);
|
||||||
|
final int iterations = scaledRandomIntBetween(100, 2000);
|
||||||
|
assertThat(bitSet.completeSetsSize(), equalTo(0L));
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
|
||||||
|
long totalDocs = 0;
|
||||||
|
for (long i = 0; i < iterations; i++) {
|
||||||
|
int batchSize = between(1, 1500);
|
||||||
|
totalDocs += batchSize;
|
||||||
|
currentSeq -= batchSize;
|
||||||
|
List<Long> batch = LongStream.range(currentSeq, currentSeq + batchSize)
|
||||||
|
.boxed()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
Randomness.shuffle(batch);
|
||||||
|
batch.forEach(seq -> {
|
||||||
|
boolean existed = normalSet.add(seq) == false;
|
||||||
|
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L));
|
||||||
|
});
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
|
||||||
|
}
|
||||||
|
assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024));
|
||||||
|
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,9 @@ import org.hamcrest.TypeSafeMatcher;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
public final class SnapshotMatchers {
|
public final class SnapshotMatchers {
|
||||||
|
@ -50,10 +53,14 @@ public final class SnapshotMatchers {
|
||||||
/**
|
/**
|
||||||
* Consumes a snapshot and make sure it's content is as expected
|
* Consumes a snapshot and make sure it's content is as expected
|
||||||
*/
|
*/
|
||||||
public static Matcher<Translog.Snapshot> equalsTo(ArrayList<Translog.Operation> ops) {
|
public static Matcher<Translog.Snapshot> equalsTo(List<Translog.Operation> ops) {
|
||||||
return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()]));
|
return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Matcher<Translog.Snapshot> containsOperationsInAnyOrder(Collection<Translog.Operation> expectedOperations) {
|
||||||
|
return new ContainingInAnyOrderMatcher(expectedOperations);
|
||||||
|
}
|
||||||
|
|
||||||
public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
|
public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
|
||||||
|
|
||||||
private final int size;
|
private final int size;
|
||||||
|
@ -127,5 +134,60 @@ public final class SnapshotMatchers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher<Translog.Snapshot> {
|
||||||
|
private final Collection<Translog.Operation> expectedOps;
|
||||||
|
private List<Translog.Operation> notFoundOps;
|
||||||
|
private List<Translog.Operation> notExpectedOps;
|
||||||
|
|
||||||
|
static List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOException {
|
||||||
|
final List<Translog.Operation> actualOps = new ArrayList<>();
|
||||||
|
Translog.Operation op;
|
||||||
|
while ((op = snapshot.next()) != null) {
|
||||||
|
actualOps.add(op);
|
||||||
|
}
|
||||||
|
return actualOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainingInAnyOrderMatcher(Collection<Translog.Operation> expectedOps) {
|
||||||
|
this.expectedOps = expectedOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean matchesSafely(Translog.Snapshot snapshot) {
|
||||||
|
try {
|
||||||
|
List<Translog.Operation> actualOps = drainAll(snapshot);
|
||||||
|
notFoundOps = expectedOps.stream()
|
||||||
|
.filter(o -> actualOps.contains(o) == false)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
notExpectedOps = actualOps.stream()
|
||||||
|
.filter(o -> expectedOps.contains(o) == false)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return notFoundOps.isEmpty() && notExpectedOps.isEmpty();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new ElasticsearchException("failed to read snapshot content", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) {
|
||||||
|
if (notFoundOps.isEmpty() == false) {
|
||||||
|
mismatchDescription
|
||||||
|
.appendText("not found ").appendValueList("[", ", ", "]", notFoundOps);
|
||||||
|
}
|
||||||
|
if (notExpectedOps.isEmpty() == false) {
|
||||||
|
if (notFoundOps.isEmpty() == false) {
|
||||||
|
mismatchDescription.appendText("; ");
|
||||||
|
}
|
||||||
|
mismatchDescription
|
||||||
|
.appendText("not expected ").appendValueList("[", ", ", "]", notExpectedOps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void describeTo(Description description) {
|
||||||
|
description.appendText("snapshot contains ")
|
||||||
|
.appendValueList("[", ", ", "]", expectedOps)
|
||||||
|
.appendText(" in any order.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,10 +83,13 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.InvalidPathException;
|
import java.nio.file.InvalidPathException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -107,6 +110,7 @@ import java.util.stream.LongStream;
|
||||||
|
|
||||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
|
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
|
||||||
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
|
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
|
||||||
|
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
|
||||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.empty;
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
@ -217,7 +221,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
|
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
|
private void addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
|
||||||
list.add(op);
|
list.add(op);
|
||||||
translog.add(op);
|
translog.add(op);
|
||||||
}
|
}
|
||||||
|
@ -520,7 +524,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
Translog.Snapshot snapshot2 = translog.newSnapshot();
|
Translog.Snapshot snapshot2 = translog.newSnapshot();
|
||||||
toClose.add(snapshot2);
|
toClose.add(snapshot2);
|
||||||
markCurrentGenAsCommitted(translog);
|
markCurrentGenAsCommitted(translog);
|
||||||
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
|
assertThat(snapshot2, containsOperationsInAnyOrder(ops));
|
||||||
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
|
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeWhileHandlingException(toClose);
|
IOUtils.closeWhileHandlingException(toClose);
|
||||||
|
@ -1028,7 +1032,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(max.generation, translog.currentFileGeneration());
|
assertEquals(max.generation, translog.currentFileGeneration());
|
||||||
try (Translog.Snapshot snap = translog.newSnapshot()) {
|
try (Translog.Snapshot snap = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
Translog.Operation next;
|
Translog.Operation next;
|
||||||
Translog.Operation maxOp = null;
|
Translog.Operation maxOp = null;
|
||||||
while ((next = snap.next()) != null) {
|
while ((next = snap.next()) != null) {
|
||||||
|
@ -1252,7 +1256,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
int upTo = sync ? translogOperations : prepareOp;
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
for (int i = 0; i < upTo; i++) {
|
for (int i = 0; i < upTo; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -1266,7 +1270,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
int upTo = sync ? translogOperations : prepareOp;
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
for (int i = 0; i < upTo; i++) {
|
for (int i = 0; i < upTo; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -1310,7 +1314,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
int upTo = sync ? translogOperations : prepareOp;
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
for (int i = 0; i < upTo; i++) {
|
for (int i = 0; i < upTo; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -1325,7 +1329,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
int upTo = sync ? translogOperations : prepareOp;
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
for (int i = 0; i < upTo; i++) {
|
for (int i = 0; i < upTo; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -1374,7 +1378,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
||||||
int upTo = sync ? translogOperations : prepareOp;
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
for (int i = 0; i < upTo; i++) {
|
for (int i = 0; i < upTo; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -2061,7 +2065,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecoverWithUnbackedNextGen() throws IOException {
|
public void testRecoverWithUnbackedNextGen() throws IOException {
|
||||||
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||||
translog.close();
|
translog.close();
|
||||||
TranslogConfig config = translog.getConfig();
|
TranslogConfig config = translog.getConfig();
|
||||||
|
|
||||||
|
@ -2072,21 +2076,25 @@ public class TranslogTests extends ESTestCase {
|
||||||
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
|
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
for (int i = 0; i < 1; i++) {
|
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation op = snapshot.next();
|
||||||
assertNotNull("operation " + i + " must be non-null", next);
|
assertNotNull("operation 1 must be non-null", op);
|
||||||
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString()));
|
assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString()));
|
||||||
}
|
|
||||||
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
|
try (Translog tlog = createTranslog(config, translog.getTranslogUUID());
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
for (int i = 0; i < 2; i++) {
|
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation secondOp = snapshot.next();
|
||||||
assertNotNull("operation " + i + " must be non-null", next);
|
assertNotNull("operation 2 must be non-null", secondOp);
|
||||||
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString()));
|
assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2);
|
||||||
}
|
|
||||||
|
Translog.Operation firstOp = snapshot.next();
|
||||||
|
assertNotNull("operation 1 must be non-null", firstOp);
|
||||||
|
assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2489,6 +2497,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos));
|
assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos));
|
||||||
readFromSnapshot++;
|
readFromSnapshot++;
|
||||||
}
|
}
|
||||||
|
readFromSnapshot += snapshot.overriddenOperations();
|
||||||
}
|
}
|
||||||
assertThat(readFromSnapshot, equalTo(expectedSnapshotOps));
|
assertThat(readFromSnapshot, equalTo(expectedSnapshotOps));
|
||||||
final long seqNoLowerBound = seqNo;
|
final long seqNoLowerBound = seqNo;
|
||||||
|
@ -2534,4 +2543,84 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSnapshotReadOperationInReverse() throws Exception {
|
||||||
|
final Deque<List<Translog.Operation>> views = new ArrayDeque<>();
|
||||||
|
views.push(new ArrayList<>());
|
||||||
|
final AtomicLong seqNo = new AtomicLong();
|
||||||
|
|
||||||
|
final int generations = randomIntBetween(2, 20);
|
||||||
|
for (int gen = 0; gen < generations; gen++) {
|
||||||
|
final int operations = randomIntBetween(1, 100);
|
||||||
|
for (int i = 0; i < operations; i++) {
|
||||||
|
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1});
|
||||||
|
translog.add(op);
|
||||||
|
views.peek().add(op);
|
||||||
|
}
|
||||||
|
if (frequently()) {
|
||||||
|
translog.rollGeneration();
|
||||||
|
views.push(new ArrayList<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||||
|
final List<Translog.Operation> expectedSeqNo = new ArrayList<>();
|
||||||
|
while (views.isEmpty() == false) {
|
||||||
|
expectedSeqNo.addAll(views.pop());
|
||||||
|
}
|
||||||
|
assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSnapshotDedupOperations() throws Exception {
|
||||||
|
final Map<Long, Translog.Operation> latestOperations = new HashMap<>();
|
||||||
|
final int generations = between(2, 20);
|
||||||
|
for (int gen = 0; gen < generations; gen++) {
|
||||||
|
List<Long> batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList());
|
||||||
|
Randomness.shuffle(batch);
|
||||||
|
for (Long seqNo : batch) {
|
||||||
|
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1});
|
||||||
|
translog.add(op);
|
||||||
|
latestOperations.put(op.seqNo(), op);
|
||||||
|
}
|
||||||
|
translog.rollGeneration();
|
||||||
|
}
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||||
|
assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class SortedSnapshot implements Translog.Snapshot {
|
||||||
|
private final Translog.Snapshot snapshot;
|
||||||
|
private List<Translog.Operation> operations = null;
|
||||||
|
|
||||||
|
SortedSnapshot(Translog.Snapshot snapshot) {
|
||||||
|
this.snapshot = snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int totalOperations() {
|
||||||
|
return snapshot.totalOperations();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Translog.Operation next() throws IOException {
|
||||||
|
if (operations == null) {
|
||||||
|
operations = new ArrayList<>();
|
||||||
|
Translog.Operation op;
|
||||||
|
while ((op = snapshot.next()) != null) {
|
||||||
|
operations.add(op);
|
||||||
|
}
|
||||||
|
operations.sort(Comparator.comparing(Translog.Operation::seqNo));
|
||||||
|
}
|
||||||
|
if (operations.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return operations.remove(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
snapshot.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue