diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 9afc57309cc..7c3b2da32e8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -70,7 +70,7 @@ public class TransportBulkShardOperationsAction request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } - static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { + public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { case INDEX: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 0f82bfab102..1f5f487580b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -60,8 +60,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -559,11 +560,11 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception { final List> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); - final Set> operationsOnLeader = new HashSet<>(); + final Map operationsOnLeader = new HashMap<>(); try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { - operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType())); + operationsOnLeader.put(op.seqNo(), op); } } for (IndexShard followingShard : follower) { @@ -573,14 +574,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest List> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader)); - final Set> operationsOnFollower = new HashSet<>(); try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { - operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType())); + Translog.Operation leaderOp = operationsOnLeader.get(op.seqNo()); + assertThat(TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm(op, leaderOp.primaryTerm()), + equalTo(leaderOp)); } } - assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader)); } }