diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6fd7da91645..0014404057f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -844,11 +844,11 @@ public abstract class TransportReplicationAction nodesSentTo = new HashMap<>(); + boolean executeOnReplica = + action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings()); + for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { + // no duplicate requests + Request replicationRequest = (Request) capturedRequest.request; + assertNull(nodesSentTo.put(capturedRequest.node.getId(), replicationRequest)); + // the request is hitting the correct shard + assertEquals(request.shardId, replicationRequest.shardId); + } + + // no request was sent to the local node + assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId()))); + + // requests were sent to the correct shard copies + for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id())) { + if (shard.primary() == false && executeOnReplica == false) { + continue; + } + if (shard.unassigned()) { + continue; + } + if (shard.primary() == false) { + nodesSentTo.remove(shard.currentNodeId()); + } + if (shard.relocating()) { + nodesSentTo.remove(shard.relocatingNodeId()); + } + } + + assertThat(nodesSentTo.entrySet(), is(empty())); + if (assignedReplicas > 0) { assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false)); } @@ -511,6 +548,12 @@ public class TransportReplicationActionTests extends ESTestCase { transport.clear(); assertEquals(1, shardFailedRequests.length); CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; + // get the shard the request was sent to + ShardRouting routing = clusterService.state().getRoutingNodes().node(capturedRequest.node.id()).get(request.shardId.id()); + // and the shard that was requested to be failed + ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)shardFailedRequest.request; + // the shard the request was sent to and the shard to be failed should be the same + assertEquals(shardRoutingEntry.getShardRouting(), routing); failures.add(shardFailedRequest); transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); }