Relax history check in ShardFollowTaskReplicationTests (#39162)
The follower won't always have the same history as the leader for its soft-deletes retention can be different. However, if some operation exists on the history of the follower, then the same operation must exist on the leader. This change relaxes the history check in ShardFollowTaskReplicationTests. Closes #39093
This commit is contained in:
parent
820ba8169e
commit
cdec11c4eb
|
@ -70,7 +70,7 @@ public class TransportBulkShardOperationsAction
|
||||||
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
|
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
|
public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
|
||||||
final Translog.Operation operationWithPrimaryTerm;
|
final Translog.Operation operationWithPrimaryTerm;
|
||||||
switch (operation.opType()) {
|
switch (operation.opType()) {
|
||||||
case INDEX:
|
case INDEX:
|
||||||
|
|
|
@ -60,8 +60,9 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -559,11 +560,11 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception {
|
boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception {
|
||||||
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
|
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
|
||||||
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
||||||
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnLeader = new HashSet<>();
|
final Map<Long, Translog.Operation> operationsOnLeader = new HashMap<>();
|
||||||
try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
||||||
Translog.Operation op;
|
Translog.Operation op;
|
||||||
while ((op = snapshot.next()) != null) {
|
while ((op = snapshot.next()) != null) {
|
||||||
operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType()));
|
operationsOnLeader.put(op.seqNo(), op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (IndexShard followingShard : follower) {
|
for (IndexShard followingShard : follower) {
|
||||||
|
@ -573,14 +574,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
||||||
List<Tuple<String, Long>> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream()
|
List<Tuple<String, Long>> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream()
|
||||||
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
||||||
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
|
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
|
||||||
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnFollower = new HashSet<>();
|
|
||||||
try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
||||||
Translog.Operation op;
|
Translog.Operation op;
|
||||||
while ((op = snapshot.next()) != null) {
|
while ((op = snapshot.next()) != null) {
|
||||||
operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType()));
|
Translog.Operation leaderOp = operationsOnLeader.get(op.seqNo());
|
||||||
|
assertThat(TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm(op, leaderOp.primaryTerm()),
|
||||||
|
equalTo(leaderOp));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue