Primary replica resync should not send ops without seqno (#40433)

Primary-replica resync in a mixed-cluster between 6.x and 5.6 can send
operations without sequence number to a replica which already processed
operations with sequence number. This leads to the failure of that
replica for we trip the sequence number assertion when writing resync
operations without sequence number to translog.
This commit is contained in:
Nhat Nguyen 2019-04-03 22:17:31 -04:00
parent 8f72d72c4c
commit 5a2eb07c0e
3 changed files with 61 additions and 3 deletions

View File

@ -246,11 +246,11 @@ public class PrimaryReplicaSyncer {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 &&
(seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) {
totalSkippedOps.incrementAndGet();
continue;
}
assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]";
operations.add(operation);
size += operation.estimateSize();
totalSentOps.incrementAndGet();
@ -260,7 +260,6 @@ public class PrimaryReplicaSyncer {
break;
}
}
final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {

View File

@ -42,21 +42,30 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
@ -186,6 +195,31 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
}
}
public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception {
IndexShard shard = spy(newStartedShard(true));
when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO);
int numOps = between(0, 20);
List<Translog.Operation> operations = new ArrayList<>();
for (int i = 0; i < numOps; i++) {
operations.add(new Translog.Index(
"_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
}
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
List<Translog.Operation> sentOperations = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
sentOperations.addAll(Arrays.asList(request.getOperations()));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.actionGet();
assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList())));
closeShards(shard);
}
public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));

View File

@ -37,6 +37,7 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
@ -142,4 +143,28 @@ public class TestTranslog {
}
return ops;
}
public static Translog.Snapshot newSnapshotFromOperations(List<Translog.Operation> operations) {
final Iterator<Translog.Operation> iterator = operations.iterator();
return new Translog.Snapshot() {
@Override
public int totalOperations() {
return operations.size();
}
@Override
public Translog.Operation next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
@Override
public void close() {
}
};
}
}