diff --git a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java index d4e2c652fa8..ca003bfc07b 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.resync; import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,6 +29,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Represents a batch of operations sent from the primary to its replicas during the primary-replica resync. @@ -36,15 +38,17 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest listener) { + long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @@ -170,7 +172,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { }; try { new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -191,6 +193,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { private final Translog.Snapshot snapshot; private final long startingSeqNo; private final long maxSeqNo; + private final long maxSeenAutoIdTimestamp; private final int chunkSizeInBytes; private final ActionListener listener; private final AtomicBoolean firstMessage = new AtomicBoolean(true); @@ -199,7 +202,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent { private AtomicBoolean closed = new AtomicBoolean(); SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm, - Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener listener) { + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, + long maxSeenAutoIdTimestamp, ActionListener listener) { this.logger = logger; this.syncAction = syncAction; this.task = task; @@ -210,6 +214,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { this.chunkSizeInBytes = chunkSizeInBytes; this.startingSeqNo = startingSeqNo; this.maxSeqNo = maxSeqNo; + this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp; this.listener = listener; task.setTotalOperations(snapshot.totalOperations()); } @@ -260,7 +265,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent { if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { task.setPhase("sending_ops"); ResyncReplicationRequest request = - new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY)); + new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY)); logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get()); firstMessage.set(false); diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index 15b8e1c99d2..230eccb0578 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -38,7 +38,7 @@ public class ResyncReplicationRequestTests extends ESTestCase { final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), randomNonNegativeLong(), bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); - final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index}); + final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index}); final BytesStreamOutput out = new BytesStreamOutput(); before.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 3883554acc0..c38fb8a4956 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.replication; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; @@ -633,6 +634,49 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } } + public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + IndexShard replica1 = shards.getReplicas().get(0); + IndexShard replica2 = shards.getReplicas().get(1); + long maxTimestampOnReplica1 = -1; + long maxTimestampOnReplica2 = -1; + List replicationRequests = new ArrayList<>(); + for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + indexRequest.process(Version.CURRENT, null, index.getName()); + final IndexRequest copyRequest; + if (randomBoolean()) { + copyRequest = copyIndexRequest(indexRequest); + indexRequest.onRetry(); + } else { + copyRequest = copyIndexRequest(indexRequest); + copyRequest.onRetry(); + } + replicationRequests.add(copyRequest); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary); + if (randomBoolean()) { + indexOnReplica(bulkShardRequest, shards, replica1); + maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp()); + } else { + indexOnReplica(bulkShardRequest, shards, replica2); + maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp()); + } + } + assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2)); + shards.promoteReplicaToPrimary(replica1).get(); + assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1)); + for (IndexRequest request : replicationRequests) { + shards.index(request); // deliver via normal replication + } + for (IndexShard shard : shards) { + assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2))); + } + } + } + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index c5fc0e4b034..28e625b34df 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -76,7 +76,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { // Index doc but not advance local checkpoint. shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true); } long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0; @@ -105,6 +105,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { .findFirst() .isPresent(), is(false)); + + assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { if (shard.indexSettings.isSoftDeleteEnabled()) {