diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index ed23017410e..4e6ec3c3584 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,6 +55,8 @@ public abstract class ReplicationRequest relocation ongoing state:\n{}", clusterService.state().prettyPrint()); + + Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener); + reroutePhase.run(); + assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class); + + request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1); + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(request, listener); + reroutePhase.run(); + assertFalse("cluster state too old didn't cause a retry", listener.isDone()); + + // finish relocation + ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0); + AllocationService allocationService = ESAllocationTestCase.createAllocationService(); + RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget)); + ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build(); + + clusterService.setState(updatedState); + logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint()); + + IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); + final List capturedRequests = + transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + assertThat(capturedRequests.get(0).action, equalTo("testAction[p]")); + assertIndexShardCounter(1); + } + public void testUnknownIndexOrShardOnReroute() throws InterruptedException { final String index = "test"; // no replicas in oder to skip the replication part