testWaitForPendingSeqNo didn't properly wait for all pending ops to "stuck"

The test only waited for one op to be stuck. In rare occasions the other ops were still in flight when recovery captured a translog snapshot throwing doc count off.
This commit is contained in:
Boaz Leskes 2017-07-26 13:49:15 +02:00
parent 015424d9f4
commit 26e82610b7
1 changed files with 4 additions and 4 deletions

View File

@ -350,7 +350,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
closeShards(replica); closeShards(replica);
docs += pendingDocs; docs += pendingDocs;
primaryEngineFactory.latchIndexers(); primaryEngineFactory.latchIndexers(pendingDocs);
CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs); CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs);
for (int i = 0; i < pendingDocs; i++) { for (int i = 0; i < pendingDocs; i++) {
final String id = "pending_" + i; final String id = "pending_" + i;
@ -450,7 +450,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps) public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps)
throws IOException { throws IOException {
// index a doc which is not part of the snapshot, but also does not complete on replica // index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(); replicaEngineFactory.latchIndexers(1);
threadPool.generic().submit(() -> { threadPool.generic().submit(() -> {
try { try {
shards.index(new IndexRequest(index.getName(), "type", "pending").source("{}", XContentType.JSON)); shards.index(new IndexRequest(index.getName(), "type", "pending").source("{}", XContentType.JSON));
@ -593,10 +593,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
private final AtomicReference<CountDownLatch> blockReference = new AtomicReference<>(); private final AtomicReference<CountDownLatch> blockReference = new AtomicReference<>();
private final AtomicReference<CountDownLatch> blockedIndexers = new AtomicReference<>(); private final AtomicReference<CountDownLatch> blockedIndexers = new AtomicReference<>();
public synchronized void latchIndexers() { public synchronized void latchIndexers(int count) {
final CountDownLatch block = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1);
blocks.add(block); blocks.add(block);
blockedIndexers.set(new CountDownLatch(1)); blockedIndexers.set(new CountDownLatch(count));
assert blockReference.compareAndSet(null, block); assert blockReference.compareAndSet(null, block);
} }