From cdec11c4eb048e0c11ac19d51108fe1b36d3b6d4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 20 Feb 2019 08:10:29 -0500 Subject: [PATCH] Relax history check in ShardFollowTaskReplicationTests (#39162) The follower won't always have the same history as the leader for its soft-deletes retention can be different. However, if some operation exists on the history of the follower, then the same operation must exist on the leader. This change relaxes the history check in ShardFollowTaskReplicationTests. Closes #39093 --- .../bulk/TransportBulkShardOperationsAction.java | 2 +- .../ccr/action/ShardFollowTaskReplicationTests.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 9afc57309cc..7c3b2da32e8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -70,7 +70,7 @@ public class TransportBulkShardOperationsAction request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } - static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { + public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { case INDEX: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 0f82bfab102..1f5f487580b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -60,8 +60,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -559,11 +560,11 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception { final List> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); - final Set> operationsOnLeader = new HashSet<>(); + final Map operationsOnLeader = new HashMap<>(); try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { - operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType())); + operationsOnLeader.put(op.seqNo(), op); } } for (IndexShard followingShard : follower) { @@ -573,14 +574,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest List> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader)); - final Set> operationsOnFollower = new HashSet<>(); try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { - operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType())); + Translog.Operation leaderOp = operationsOnLeader.get(op.seqNo()); + assertThat(TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm(op, leaderOp.primaryTerm()), + equalTo(leaderOp)); } } - assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader)); } }