diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index f3e631f8bf6..07aade95292 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -246,11 +246,11 @@ public class PrimaryReplicaSyncer { Translog.Operation operation; while ((operation = snapshot.next()) != null) { final long seqNo = operation.seqNo(); - if (startingSeqNo >= 0 && - (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) { totalSkippedOps.incrementAndGet(); continue; } + assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]"; operations.add(operation); size += operation.estimateSize(); totalSentOps.incrementAndGet(); @@ -260,7 +260,6 @@ public class PrimaryReplicaSyncer { break; } } - final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO; // have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 85e381b176c..c7d59fdb7c2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -42,21 +42,30 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.translog.TestTranslog; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -186,6 +195,31 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { } } + public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { + IndexShard shard = spy(newStartedShard(true)); + when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO); + int numOps = between(0, 20); + List operations = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + operations.add(new Translog.Index( + "_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1})); + } + doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong()); + TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + List sentOperations = new ArrayList<>(); + PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { + sentOperations.addAll(Arrays.asList(request.getOperations())); + listener.onResponse(new ResyncReplicationResponse()); + }; + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); + PlainActionFuture fut = new PlainActionFuture<>(); + syncer.resync(shard, fut); + fut.actionGet(); + assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList()))); + closeShards(shard); + } + public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10), randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000)); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 003054fc715..a3ebfff478e 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -37,6 +37,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; @@ -142,4 +143,28 @@ public class TestTranslog { } return ops; } + + public static Translog.Snapshot newSnapshotFromOperations(List operations) { + final Iterator iterator = operations.iterator(); + return new Translog.Snapshot() { + @Override + public int totalOperations() { + return operations.size(); + } + + @Override + public Translog.Operation next() { + if (iterator.hasNext()) { + return iterator.next(); + } else { + return null; + } + } + + @Override + public void close() { + + } + }; + } }