Fix testSendSnapshotSendsOps

We need to use a concurrent collection to keep track of the shipped operations
as they can arrive concurrently since #58018.

Relates #58018
This commit is contained in:
Nhat Nguyen 2020-07-08 11:52:05 -04:00
parent 93a5eb0688
commit 00c859bfca
1 changed files with 7 additions and 3 deletions

View File

@ -102,6 +102,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -110,6 +111,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import static java.util.Collections.emptyMap;
@ -236,7 +238,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final List<Translog.Operation> shippedOps = new ArrayList<>();
final Queue<Translog.Operation> shippedOps = ConcurrentCollections.newQueue();
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
@ -255,10 +257,12 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
assertThat(result.sentOperations, equalTo(expectedOps));
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
List<Translog.Operation> sortedShippedOps = shippedOps.stream()
.sorted(Comparator.comparing(Translog.Operation::seqNo))
.collect(Collectors.toList());
assertThat(shippedOps.size(), equalTo(expectedOps));
for (int i = 0; i < shippedOps.size(); i++) {
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
assertThat(sortedShippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
}
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
}