Merge pull request #15791 from jasontedor/relocating-shard-failure
Only fail the relocation target when a replication request on it fails Closes #15790
This commit is contained in:
commit
3b192cfc74
|
@ -844,11 +844,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
||||||
// we never execute replication operation locally as primary operation has already completed locally
|
// we never execute replication operation locally as primary operation has already completed locally
|
||||||
// hence, we ignore any local shard for replication
|
// hence, we ignore any local shard for replication
|
||||||
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
|
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
|
||||||
performOnReplica(shard, shard.currentNodeId());
|
performOnReplica(shard);
|
||||||
}
|
}
|
||||||
// send operation to relocating shard
|
// send operation to relocating shard
|
||||||
if (shard.relocating()) {
|
if (shard.relocating()) {
|
||||||
performOnReplica(shard, shard.relocatingNodeId());
|
performOnReplica(shard.buildTargetRelocatingShard());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -856,9 +856,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
||||||
/**
|
/**
|
||||||
* send replica operation to target node
|
* send replica operation to target node
|
||||||
*/
|
*/
|
||||||
void performOnReplica(final ShardRouting shard, final String nodeId) {
|
void performOnReplica(final ShardRouting shard) {
|
||||||
// if we don't have that node, it means that it might have failed and will be created again, in
|
// if we don't have that node, it means that it might have failed and will be created again, in
|
||||||
// this case, we don't have to do the operation, and just let it failover
|
// this case, we don't have to do the operation, and just let it failover
|
||||||
|
String nodeId = shard.currentNodeId();
|
||||||
if (!nodes.nodeExists(nodeId)) {
|
if (!nodes.nodeExists(nodeId)) {
|
||||||
logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId);
|
logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId);
|
||||||
onReplicaFailure(nodeId, null);
|
onReplicaFailure(nodeId, null);
|
||||||
|
|
|
@ -302,6 +302,10 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
this.failure = failure;
|
this.failure = failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ShardRouting getShardRouting() {
|
||||||
|
return shardRouting;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
|
|
|
@ -65,6 +65,7 @@ import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -75,9 +76,13 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
||||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
|
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
|
||||||
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
import static org.hamcrest.Matchers.arrayWithSize;
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasItem;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
|
@ -486,7 +491,39 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
replicationPhase.run();
|
replicationPhase.run();
|
||||||
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
|
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
|
||||||
transport.clear();
|
transport.clear();
|
||||||
assertThat(capturedRequests.length, equalTo(assignedReplicas));
|
|
||||||
|
HashMap<String, Request> nodesSentTo = new HashMap<>();
|
||||||
|
boolean executeOnReplica =
|
||||||
|
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
|
||||||
|
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
|
||||||
|
// no duplicate requests
|
||||||
|
Request replicationRequest = (Request) capturedRequest.request;
|
||||||
|
assertNull(nodesSentTo.put(capturedRequest.node.getId(), replicationRequest));
|
||||||
|
// the request is hitting the correct shard
|
||||||
|
assertEquals(request.shardId, replicationRequest.shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// no request was sent to the local node
|
||||||
|
assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId())));
|
||||||
|
|
||||||
|
// requests were sent to the correct shard copies
|
||||||
|
for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id())) {
|
||||||
|
if (shard.primary() == false && executeOnReplica == false) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (shard.unassigned()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (shard.primary() == false) {
|
||||||
|
nodesSentTo.remove(shard.currentNodeId());
|
||||||
|
}
|
||||||
|
if (shard.relocating()) {
|
||||||
|
nodesSentTo.remove(shard.relocatingNodeId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(nodesSentTo.entrySet(), is(empty()));
|
||||||
|
|
||||||
if (assignedReplicas > 0) {
|
if (assignedReplicas > 0) {
|
||||||
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
|
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
|
||||||
}
|
}
|
||||||
|
@ -511,6 +548,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||||
transport.clear();
|
transport.clear();
|
||||||
assertEquals(1, shardFailedRequests.length);
|
assertEquals(1, shardFailedRequests.length);
|
||||||
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
|
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
|
||||||
|
// get the shard the request was sent to
|
||||||
|
ShardRouting routing = clusterService.state().getRoutingNodes().node(capturedRequest.node.id()).get(request.shardId.id());
|
||||||
|
// and the shard that was requested to be failed
|
||||||
|
ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)shardFailedRequest.request;
|
||||||
|
// the shard the request was sent to and the shard to be failed should be the same
|
||||||
|
assertEquals(shardRoutingEntry.getShardRouting(), routing);
|
||||||
failures.add(shardFailedRequest);
|
failures.add(shardFailedRequest);
|
||||||
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
|
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue