Simplify testSendSnapshotSendsOps (#37445)

The test testSendSnapshotSendsOps is currently using a mock instance of
RecoveryTargetHandler which will be hard to modify when we make the
RecoveryTargetHandler non-blocking. This commit prepares for the
incoming changes by replacing the mock instance with a stub.
This commit is contained in:
Nhat Nguyen 2019-01-15 03:07:56 -05:00 committed by GitHub
parent b594e81c86
commit bf49f54456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 55 deletions

View File

@ -76,7 +76,6 @@ import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.io.OutputStream;
@ -108,7 +107,6 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RecoverySourceHandlerTests extends ESTestCase {
@ -205,9 +203,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
final RecoverySourceHandler handler =
new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
for (int i = 0; i < initialNumberOfDocs; i++) {
@ -219,38 +214,23 @@ public class RecoverySourceHandlerTests extends ESTestCase {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
}
operations.add(null);
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
final List<Translog.Operation> shippedOps = new ArrayList<>();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu) {
shippedOps.addAll(operations);
return SequenceNumbers.NO_OPS_PERFORMED;
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
}
private int counter = 0;
@Override
public int totalOperations() {
return operations.size() - 1;
}
@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
}, randomNonNegativeLong(), randomNonNegativeLong());
endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong());
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
assertThat(result.totalOperations, equalTo(expectedOps));
final ArgumentCaptor<List> shippedOpsCaptor = ArgumentCaptor.forClass(List.class);
verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(),
ArgumentCaptor.forClass(Long.class).capture(), ArgumentCaptor.forClass(Long.class).capture());
List<Translog.Operation> shippedOps = new ArrayList<>();
for (List list: shippedOpsCaptor.getAllValues()) {
shippedOps.addAll(list);
}
shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
assertThat(shippedOps.size(), equalTo(expectedOps));
for (int i = 0; i < shippedOps.size(); i++) {
@ -261,30 +241,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
expectThrows(IllegalStateException.class, () ->
handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
}
private int counter = 0;
@Override
public int totalOperations() {
return operations.size() - 1 - opsToSkip.size();
}
@Override
public Translog.Operation next() throws IOException {
Translog.Operation op;
do {
op = operations.get(counter++);
} while (op != null && opsToSkip.contains(op));
return op;
}
}, randomNonNegativeLong(), randomNonNegativeLong()));
expectThrows(IllegalStateException.class, () -> handler.phase2(startingSeqNo, requiredStartingSeqNo,
endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong()));
}
}
@ -716,4 +674,39 @@ public class RecoverySourceHandlerTests extends ESTestCase {
int totalTranslogOps, ActionListener<Void> listener) {
}
}
private Translog.Snapshot newTranslogSnapshot(List<Translog.Operation> operations, List<Translog.Operation> operationsToSkip) {
return new Translog.Snapshot() {
int index = 0;
int skippedCount = 0;
@Override
public int totalOperations() {
return operations.size();
}
@Override
public int skippedOperations() {
return skippedCount;
}
@Override
public Translog.Operation next() {
while (index < operations.size()) {
Translog.Operation op = operations.get(index++);
if (operationsToSkip.contains(op)) {
skippedCount++;
} else {
return op;
}
}
return null;
}
@Override
public void close() {
}
};
}
}