From 33791ac27c7a9591ea2d873fa148e2427cd06b75 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 10 Oct 2018 15:39:57 -0400 Subject: [PATCH] CCR: Following primary should process operations once (#34288) Today we rewrite the operations from the leader with the term of the following primary because the follower should own its history. The problem is that a newly promoted primary may re-assign its term to operations which were replicated to replicas before by the previous primary. If this happens, some operations with the same seq_no may be assigned different terms. This is not good for the future optimistic locking using a combination of seqno and term. This change ensures that the primary of a follower only processes an operation if that operation was not processed before. The skipped operations are guaranteed to be delivered to replicas via either primary-replica resync or peer-recovery. However, the primary must not acknowledge until the global checkpoint is at least the highest seqno of all skipped ops (i.e., they all have been processed on every replica). Relates #31751 Relates #31113 --- .../index/engine/InternalEngine.java | 4 +- .../index/engine/EngineTestCase.java | 11 +- .../ESIndexLevelReplicationTestCase.java | 10 +- .../TransportBulkShardOperationsAction.java | 101 +++++++++++---- ...eadyProcessedFollowingEngineException.java | 16 +++ .../ccr/index/engine/FollowingEngine.java | 31 ++++- .../xpack/ccr/ShardChangesIT.java | 63 +++++++++ .../ShardFollowTaskReplicationTests.java | 100 +++++++++++++-- .../action/bulk/BulkShardOperationsTests.java | 121 ++++++++++++++++++ .../index/engine/FollowingEngineTests.java | 74 +++++++++++ 10 files changed, 481 insertions(+), 50 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 187b0eb1359..55d93203abe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1086,7 +1086,7 @@ public class InternalEngine extends Engine { return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null); } - static IndexingStrategy skipDueToVersionConflict( + public static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) { final IndexResult result = new IndexResult(e, currentVersion, term); return new IndexingStrategy( @@ -1343,7 +1343,7 @@ public class InternalEngine extends Engine { Optional.empty() : Optional.of(earlyResultOnPreflightError); } - static DeletionStrategy skipDueToVersionConflict( + public static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 12f0d645d8a..bb1efd69973 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -791,15 +791,14 @@ public abstract class EngineTestCase extends ESTestCase { Bits liveDocs = reader.getLiveDocs(); for (int i = 0; i < reader.maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { + if (primaryTermDocValues.advanceExact(i) == false) { + // We have to skip non-root docs because its _id field is not stored (indexed only). + continue; + } + final long primaryTerm = primaryTermDocValues.longValue(); Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME)); BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME); String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length)); - final long primaryTerm; - if (primaryTermDocValues.advanceExact(i)) { - primaryTerm = primaryTermDocValues.longValue(); - } else { - primaryTerm = 0; // non-root documents of a nested document. - } if (seqNoDocValues.advanceExact(i) == false) { throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]"); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 9021fd1efbb..58fea953850 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -63,6 +63,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -442,13 +443,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase public synchronized void close() throws Exception { if (closed == false) { closed = true; - for (IndexShard replica : replicas) { - try { + try { + final List docsOnPrimary = getDocIdAndSeqNos(primary); + for (IndexShard replica : replicas) { assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes())); - } catch (AlreadyClosedException ignored) { + assertThat(getDocIdAndSeqNos(replica), equalTo(docsOnPrimary)); } - } + } catch (AlreadyClosedException ignored) { } closeShards(this); } else { throw new AlreadyClosedException("too bad"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index db71e5b5af8..6d5df143eea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; @@ -25,10 +26,12 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import java.util.function.Function; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -66,7 +69,7 @@ public class TransportBulkShardOperationsAction } // public for testing purposes only - public static WritePrimaryResult shardOperationOnPrimary( + public static CcrWritePrimaryResult shardOperationOnPrimary( final ShardId shardId, final String historyUUID, final List sourceOperations, @@ -78,7 +81,7 @@ public class TransportBulkShardOperationsAction "], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated"); } - final List targetOperations = sourceOperations.stream().map(operation -> { + final Function rewriteWithTerm = operation -> { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { case INDEX: @@ -111,36 +114,65 @@ public class TransportBulkShardOperationsAction throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); } return operationWithPrimaryTerm; - }).collect(Collectors.toList()); + }; + assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]"; primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY); + + final List appliedOperations = new ArrayList<>(sourceOperations.size()); + Translog.Location location = null; + long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + for (Translog.Operation sourceOp : sourceOperations) { + final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp); + final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY); + if (result.getResultType() == Engine.Result.Type.SUCCESS) { + assert result.getSeqNo() == targetOp.seqNo(); + appliedOperations.add(targetOp); + location = locationToSync(location, result.getTranslogLocation()); + } else { + if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) { + // Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery. + // The primary must not acknowledge this request until the global checkpoint is at least the highest + // seqno of all skipped operations (i.e., all skipped operations have been processed on every replica). + waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo()); + } else { + assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]"; + throw ExceptionsHelper.convertToElastic(result.getFailure()); + } + } + } + assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : + "waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint + + " source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size(); + assert appliedOperations.size() == 0 || location != null; final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( - shardId, historyUUID, targetOperations, maxSeqNoOfUpdatesOrDeletes); - return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); + shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes); + return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { - assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() : - "mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]"; - final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA); - return new WriteReplicaResult<>(request, location, null, replica, logger); + return shardOperationOnReplica(request, replica, logger); } // public for testing purposes only - public static Translog.Location applyTranslogOperations( - final List operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + public static WriteReplicaResult shardOperationOnReplica( + final BulkShardOperationsRequest request, final IndexShard replica, final Logger logger) throws IOException { + assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() : + "mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]"; Translog.Location location = null; - for (final Translog.Operation operation : operations) { - final Engine.Result result = shard.applyTranslogOperation(operation, origin); + for (final Translog.Operation operation : request.getOperations()) { + final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA); + if (result.getResultType() != Engine.Result.Type.SUCCESS) { + assert false : "doc-level failure must not happen on replicas; op[" + operation + "] error[" + result.getFailure() + "]"; + throw ExceptionsHelper.convertToElastic(result.getFailure()); + } assert result.getSeqNo() == operation.seqNo(); - assert result.getResultType() == Engine.Result.Type.SUCCESS; location = locationToSync(location, result.getTranslogLocation()); } - assert operations.size() == 0 || location != null; - return location; + assert request.getOperations().size() == 0 || location != null; + return new WriteReplicaResult<>(request, location, null, replica, logger); } @Override @@ -151,20 +183,37 @@ public class TransportBulkShardOperationsAction /** * Custom write result to include global checkpoint after ops have been replicated. */ - static class CcrWritePrimaryResult extends WritePrimaryResult { + static final class CcrWritePrimaryResult extends WritePrimaryResult { + final long waitingForGlobalCheckpoint; - CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) { + CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, + long waitingForGlobalCheckpoint, Logger logger) { super(request, new BulkShardOperationsResponse(), location, null, primary, logger); + this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint; } @Override public synchronized void respond(ActionListener listener) { - final BulkShardOperationsResponse response = finalResponseIfSuccessful; - final SeqNoStats seqNoStats = primary.seqNoStats(); - // return a fresh global checkpoint after the operations have been replicated for the shard follow task - response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint()); - response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); - listener.onResponse(response); + final ActionListener wrappedListener = ActionListener.wrap(response -> { + final SeqNoStats seqNoStats = primary.seqNoStats(); + // return a fresh global checkpoint after the operations have been replicated for the shard follow task + response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint()); + response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); + listener.onResponse(response); + }, listener::onFailure); + + if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) { + primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> { + if (e != null) { + listener.onFailure(e); + } else { + assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp; + super.respond(wrappedListener); + } + }, null); + } else { + super.respond(wrappedListener); + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java new file mode 100644 index 00000000000..9e19c93b286 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.index.engine; + +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.shard.ShardId; + +public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException { + AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) { + super(shardId, "operation [{}] was processed before", null, seqNo); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 458461f3c84..8a413ce4980 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -58,8 +58,21 @@ public final class FollowingEngine extends InternalEngine { final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; if (hasBeenProcessedBefore(index)) { - return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); - + if (index.origin() == Operation.Origin.PRIMARY) { + /* + * The existing operation in this engine was probably assigned the term of the previous primary shard which is different + * from the term of the current operation. If the current operation arrives on replicas before the previous operation, + * then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the + * existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync, + * we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException. + * The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that + * the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + */ + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo()); + return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); + } else { + return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + } } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) { assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; numOfOptimizedIndexing.inc(); @@ -73,7 +86,19 @@ public final class FollowingEngine extends InternalEngine { @Override protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { preFlight(delete); - return planDeletionAsNonPrimary(delete); + if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { + // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo()); + return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false); + } else { + return planDeletionAsNonPrimary(delete); + } + } + + @Override + public NoOpResult noOp(NoOp noOp) { + // TODO: Make sure we process NoOp once. + return super.noOp(noOp); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 8ed3fc53340..d5a1263abd5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -28,6 +28,8 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -47,11 +49,13 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; @@ -79,6 +83,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -117,6 +122,14 @@ public class ShardChangesIT extends ESIntegTestCase { return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class); } + @Override + protected void beforeIndexDeletion() throws Exception { + super.beforeIndexDeletion(); + assertSeqNos(); + assertSameDocIdsOnShards(); + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + } + @Override protected boolean ignoreExternalCluster() { return true; @@ -681,6 +694,56 @@ public class ShardChangesIT extends ESIntegTestCase { assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } + public void testFailOverOnFollower() throws Exception { + int numberOfReplicas = between(1, 2); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(numberOfReplicas + between(1, 2)); + String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader-index"); + AtomicBoolean stopped = new AtomicBoolean(); + Thread[] threads = new Thread[between(1, 8)]; + AtomicInteger docID = new AtomicInteger(); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (stopped.get() == false) { + try { + if (frequently()) { + String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update + client().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get(); + } else { + String id = Integer.toString(between(0, docID.get())); + client().prepareDelete("leader-index", "doc", id).get(); + } + } catch (NodeClosedException ignored) { + } + } + }); + threads[i].start(); + } + PutFollowAction.Request follow = follow("leader-index", "follower-index"); + client().execute(PutFollowAction.INSTANCE, follow).get(); + ensureGreen("follower-index"); + atLeastDocsIndexed("follower-index", between(20, 60)); + final ClusterState clusterState = clusterService().state(); + for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) { + if (shardRouting.primary()) { + DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId()); + internalCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback()); + break; + } + } + ensureGreen("follower-index"); + atLeastDocsIndexed("follower-index", between(80, 150)); + stopped.set(true); + for (Thread thread : threads) { + thread.join(); + } + assertSameDocCount("leader-index", "follower-index"); + unfollowIndex("follower-index"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 055005b9e7d..559babdc905 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -12,19 +12,22 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.Engine.Operation.Origin; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; @@ -37,7 +40,9 @@ import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -221,6 +227,57 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } + public void testRetryBulkShardOperations() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); + ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { + leaderGroup.startAll(); + followerGroup.startAll(); + leaderGroup.appendDocs(between(10, 100)); + leaderGroup.refresh("test"); + for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); + assertThat(resp.getFailure(), nullValue()); + } + leaderGroup.syncGlobalCheckpoint(); + IndexShard leadingPrimary = leaderGroup.getPrimary(); + // Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower + // but the primary of the follower crashed before these requests completed. + for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) { + long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint()); + long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint()); + int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo); + Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(), + fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); + + IndexShard followingPrimary = followerGroup.getPrimary(); + TransportWriteAction.WritePrimaryResult primaryResult = + TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(), + followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + followingPrimary, logger); + for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) { + final PlainActionFuture permitFuture = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(), + followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + permitFuture, ThreadPool.Names.SAME, primaryResult); + try (Releasable ignored = permitFuture.get()) { + TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); + } + } + } + // A follow-task retries these requests while the primary-replica resync is happening on the follower. + followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas())); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(), + leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); + }); + shardFollowTask.markAsCompleted(); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) @@ -366,13 +423,29 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest }; } - private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException { - int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0); - for (IndexShard followingShard : follower) { - assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps)); + private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws Exception { + final List> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream() + .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); + final Set> operationsOnLeader = new HashSet<>(); + try (Translog.Snapshot snapshot = leader.getPrimary().getHistoryOperations("test", 0)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType())); + } } for (IndexShard followingShard : follower) { assertThat(followingShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getPrimary().getMaxSeqNoOfUpdatesOrDeletes())); + List> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream() + .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); + assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader)); + final Set> operationsOnFollower = new HashSet<>(); + try (Translog.Snapshot snapshot = followingShard.getHistoryOperations("test", 0)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType())); + } + } + assertThat(operationsOnFollower, equalTo(operationsOnLeader)); } } @@ -384,15 +457,24 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest @Override protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception { - TransportWriteAction.WritePrimaryResult result = - TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), + final PlainActionFuture permitFuture = new PlainActionFuture<>(); + primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); + final TransportWriteAction.WritePrimaryResult ccrResult; + try (Releasable ignored = permitFuture.get()) { + ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + } + return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { + @Override + public void respond(ActionListener listener) { + ccrResult.respond(listener); + } + }; } @Override protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception { - TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA); + TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index fe85e8a7445..dfacb96c31c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -7,8 +7,14 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.lucene.index.Term; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -20,8 +26,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult; public class BulkShardOperationsTests extends IndexShardTestCase { @@ -78,4 +87,116 @@ public class BulkShardOperationsTests extends IndexShardTestCase { closeShards(followerPrimary); } + public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception { + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); + final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory()); + int numOps = between(1, 100); + for (int i = 0; i < numOps; i++) { + final String id = Integer.toString(i); + final Translog.Operation op; + if (randomBoolean()) { + op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1); + } else if (randomBoolean()) { + shard.advanceMaxSeqNoOfUpdatesOrDeletes(i); + op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0); + } else { + op = new Translog.NoOp(i, primaryTerm, "test"); + } + shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA); + } + BulkShardOperationsRequest request = new BulkShardOperationsRequest(); + { + PlainActionFuture listener = new PlainActionFuture<>(); + CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger); + primaryResult.respond(listener); + assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true)); + assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); + } + { + PlainActionFuture listener = new PlainActionFuture<>(); + long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint()); + CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); + primaryResult.respond(listener); + assertThat(listener.isDone(), equalTo(false)); + expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); + + shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test"); + expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); + + shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test"); + assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); + assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); + } + { + PlainActionFuture listener = new PlainActionFuture<>(); + long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint()); + CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); + primaryResult.respond(listener); + assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); + assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); + } + closeShards(shard); + } + + public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); + final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory()); + long seqno = 0; + List firstBulk = new ArrayList<>(); + List secondBulk = new ArrayList<>(); + for (int numOps = between(1, 100), i = 0; i < numOps; i++) { + final String id = Integer.toString(between(1, 100)); + final Translog.Operation op; + if (randomBoolean()) { + op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1); + } else { + op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0); + } + if (randomBoolean()) { + firstBulk.add(op); + } else { + secondBulk.add(op); + } + } + Randomness.shuffle(firstBulk); + Randomness.shuffle(secondBulk); + primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + + final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), + primary.getHistoryUUID(), firstBulk, seqno, primary, logger); + assertThat(fullResult.replicaRequest().getOperations(), + equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm()))); + assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L)); + + // This bulk includes some operations from the first bulk. These operations should not be included in the result. + final List existingOps = randomSubsetOf(firstBulk); + final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), + primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()), + seqno, primary, logger); + assertThat(partialResult.replicaRequest().getOperations(), + equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm()))); + assertThat(partialResult.waitingForGlobalCheckpoint, + equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L))); + + closeShards(primary); + } + + private List rewriteWithPrimaryTerm(List sourceOperations, long primaryTerm) { + return sourceOperations.stream().map(op -> { + switch (op.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) op; + return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm, + index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp()); + case DELETE: + final Translog.Delete delete = (Translog.Delete) op; + return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version()); + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) op; + return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason()); + default: + throw new IllegalStateException("unexpected operation type [" + op.opType() + "]"); + } + }).collect(Collectors.toList()); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index ce67cfe2d44..ec59e4c5b1d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; @@ -58,6 +59,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; public class FollowingEngineTests extends ESTestCase { @@ -298,6 +300,25 @@ public class FollowingEngineTests extends ESTestCase { return new Engine.Delete(parsedDoc.type(), parsedDoc.id(), EngineTestCase.newUid(parsedDoc), primaryTerm.get()); } + private Engine.Result applyOperation(Engine engine, Engine.Operation op, + long primaryTerm, Engine.Operation.Origin origin) throws IOException { + final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null; + final Engine.Result result; + if (op instanceof Engine.Index) { + Engine.Index index = (Engine.Index) op; + result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(), + versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry())); + } else if (op instanceof Engine.Delete) { + Engine.Delete delete = (Engine.Delete) op; + result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, + delete.version(), versionType, origin, delete.startTime())); + } else { + Engine.NoOp noOp = (Engine.NoOp) op; + result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason())); + } + return result; + } + public void testBasicOptimization() throws Exception { runFollowTest((leader, follower) -> { long numDocs = between(1, 100); @@ -531,4 +552,57 @@ public class FollowingEngineTests extends ESTestCase { } }; } + + public void testProcessOnceOnPrimary() throws Exception { + final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + int numOps = between(10, 100); + List operations = new ArrayList<>(numOps); + for (int i = 0; i < numOps; i++) { + ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + if (randomBoolean()) { + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); + } else { + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L, + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis())); + } + } + Randomness.shuffle(operations); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { + followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); + final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + for (Engine.Operation op : operations) { + Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values())); + assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + } + // Primary should reject duplicates + final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); + for (Engine.Operation op : operations) { + Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); + assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); + assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); + } + for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { + assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + } + // Replica should accept duplicates + primaryTerm.set(newTerm); + followingEngine.rollTranslogGeneration(); + for (Engine.Operation op : operations) { + Engine.Operation.Origin nonPrimary = randomValueOtherThan(Engine.Operation.Origin.PRIMARY, + () -> randomFrom(Engine.Operation.Origin.values())); + Engine.Result result = applyOperation(followingEngine, op, newTerm, nonPrimary); + assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + } + for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { + assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + } + } + } + } }