Prevent TransportReplicationAction to route request based on stale local routing table
Closes #16274 Closes #12573 Closes #12574
This commit is contained in:
parent
26f77eb70d
commit
af1f637547
|
@ -55,6 +55,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
|
|||
|
||||
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
||||
|
||||
private long routedBasedOnClusterVersion = 0;
|
||||
|
||||
public ReplicationRequest() {
|
||||
|
||||
}
|
||||
|
@ -141,6 +143,20 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
|
|||
return (Request) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the minimum version of the cluster state that is required on the next node before we redirect to another primary.
|
||||
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
|
||||
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
|
||||
return (Request) this;
|
||||
}
|
||||
|
||||
long routedBasedOnClusterVersion() {
|
||||
return routedBasedOnClusterVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
|
@ -161,6 +177,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
|
|||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
index = in.readString();
|
||||
routedBasedOnClusterVersion = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,6 +192,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
|
|||
out.writeByte(consistencyLevel.id());
|
||||
timeout.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeVLong(routedBasedOnClusterVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -472,6 +472,15 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
performAction(node, transportPrimaryAction, true);
|
||||
} else {
|
||||
if (state.version() < request.routedBasedOnClusterVersion()) {
|
||||
logger.trace("failed to find primary [{}] for request [{}] despite sender thinking it would be here. Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...", request.shardId(), request, state.version(), request.routedBasedOnClusterVersion());
|
||||
retryBecauseUnavailable(request.shardId(), "failed to find primary as current cluster state with version [" + state.version() + "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]");
|
||||
return;
|
||||
} else {
|
||||
// chasing the node with the active primary for a second hop requires that we are at least up-to-date with the current cluster state version
|
||||
// this prevents redirect loops between two nodes when a primary was relocated and the relocation target is not aware that it is the active primary shard already.
|
||||
request.routedBasedOnClusterVersion(state.version());
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}]", actionName, request.shardId(), request, state.version(), primary.currentNodeId());
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -53,6 +55,7 @@ import org.elasticsearch.index.shard.IndexShardState;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.cluster.TestClusterService;
|
||||
import org.elasticsearch.test.transport.CapturingTransport;
|
||||
|
@ -67,6 +70,7 @@ import org.junit.BeforeClass;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -205,6 +209,56 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
assertIndexShardCounter(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
|
||||
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
|
||||
* before the relocation target, there is a time span where relocation source believes active primary to be on
|
||||
* relocation target and relocation target believes active primary to be on relocation source. This results in replication
|
||||
* requests being sent back and forth.
|
||||
*
|
||||
* This test checks that replication request is not routed back from relocation target to relocation source in case of
|
||||
* stale index routing table on relocation target.
|
||||
*/
|
||||
public void testNoRerouteOnStaleClusterState() throws InterruptedException, ExecutionException {
|
||||
final String index = "test";
|
||||
final ShardId shardId = new ShardId(index, 0);
|
||||
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
|
||||
String relocationTargetNode = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId();
|
||||
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(relocationTargetNode)).build();
|
||||
clusterService.setState(state);
|
||||
logger.debug("--> relocation ongoing state:\n{}", clusterService.state().prettyPrint());
|
||||
|
||||
Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
|
||||
|
||||
request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1);
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(request, listener);
|
||||
reroutePhase.run();
|
||||
assertFalse("cluster state too old didn't cause a retry", listener.isDone());
|
||||
|
||||
// finish relocation
|
||||
ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0);
|
||||
AllocationService allocationService = ESAllocationTestCase.createAllocationService();
|
||||
RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget));
|
||||
ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build();
|
||||
|
||||
clusterService.setState(updatedState);
|
||||
logger.debug("--> relocation complete state:\n{}", clusterService.state().prettyPrint());
|
||||
|
||||
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
|
||||
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
|
||||
final List<CapturingTransport.CapturedRequest> capturedRequests =
|
||||
transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId);
|
||||
assertThat(capturedRequests, notNullValue());
|
||||
assertThat(capturedRequests.size(), equalTo(1));
|
||||
assertThat(capturedRequests.get(0).action, equalTo("testAction[p]"));
|
||||
assertIndexShardCounter(1);
|
||||
}
|
||||
|
||||
public void testUnknownIndexOrShardOnReroute() throws InterruptedException {
|
||||
final String index = "test";
|
||||
// no replicas in oder to skip the replication part
|
||||
|
|
Loading…
Reference in New Issue