Assert that replication requests are sent to the correct shard copies
This commit adds tighter assertions in TransportReplicationActionTests#runReplicateTest that replication requests are sent to the correct shard copies.
This commit is contained in:
parent
75106daf9c
commit
6413adb5bc
|
@ -67,6 +67,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -77,6 +78,7 @@ import static org.elasticsearch.action.support.replication.ClusterStateCreationU
|
||||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
|
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
|
||||||
import static org.hamcrest.Matchers.arrayWithSize;
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasItem;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -486,7 +488,37 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
replicationPhase.run();
|
replicationPhase.run();
|
||||||
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
|
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
|
||||||
transport.clear();
|
transport.clear();
|
||||||
|
|
||||||
assertThat(capturedRequests.length, equalTo(assignedReplicas));
|
assertThat(capturedRequests.length, equalTo(assignedReplicas));
|
||||||
|
Set<String> nodesSentTo = new HashSet<>();
|
||||||
|
boolean executeOnReplica =
|
||||||
|
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
|
||||||
|
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
|
||||||
|
// no duplicate requests
|
||||||
|
assertTrue(nodesSentTo.add(capturedRequest.node.getId()));
|
||||||
|
// the request is hitting the correct shard
|
||||||
|
Request replicationRequest = (Request)capturedRequest.request;
|
||||||
|
assertEquals(request.shardId, replicationRequest.shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// requests were sent to the correct shard copies
|
||||||
|
List<ShardRouting> shards =
|
||||||
|
clusterService.state().getRoutingTable().index(shardId.getIndex()).shard(shardId.id()).shards();
|
||||||
|
for (ShardRouting shard : shards) {
|
||||||
|
if (!shard.primary() && !executeOnReplica) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (shard.unassigned()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!clusterService.state().getNodes().localNodeId().equals(shard.currentNodeId())) {
|
||||||
|
assertThat(nodesSentTo, hasItem(shard.currentNodeId()));
|
||||||
|
}
|
||||||
|
if (shard.relocating()) {
|
||||||
|
assertThat(nodesSentTo, hasItem(shard.relocatingNodeId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (assignedReplicas > 0) {
|
if (assignedReplicas > 0) {
|
||||||
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
|
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue