Handle shards assigned to nodes that are not in the cluster state

This commit addresses an issue in TransportBroadcastByNodeAction.
Namely, if in between a master leaving the cluster and a new one being
assigned, a request that relies on TransportBroadcastByNodeAction
(e.g., an indices stats request) is issued,
TransportBroadcastByNodeAction might attempt to send a request to a
node that is no longer in the local node’s cluster state.

The exact circumstances that lead to this are as follows. When the
master leaves the cluster and another node’s master fault detection
detects this, the local node will update its local cluster state to no
longer include the master node. However, the routing table will not be
updated. This means that in the preparation for sending the requests in
TransportBroadcastByNodeAction, we need to check that not only is a
shard assigned, but also that it is assigned to a node that is still in
the local node’s cluster state.

This commit adds such a check to the constructor of
TransportBroadcastByNodeAction. A new unit test is added that checks
that no request is sent to the master node in such a situation; this
test fails with a NullPointerException without the fix. Additionally,
the unit test TransportBroadcastByNodeActionTests#testResultAggregation
is updated to also simulate a master failure. This updated test also
fails prior to the fix.

Closes #14584
This commit is contained in:
Jason Tedor 2015-11-06 10:30:54 -05:00
parent 40f82de4a1
commit 699c701b59
2 changed files with 60 additions and 1 deletions

View File

@ -228,7 +228,13 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
nodeIds = new HashMap<>(); nodeIds = new HashMap<>();
for (ShardRouting shard : shardIt.asUnordered()) { for (ShardRouting shard : shardIt.asUnordered()) {
if (shard.assignedToNode()) { // send a request to the shard only if it is assigned to a node that is in the local node's cluster state
// a scenario in which a shard can be assigned but to a node that is not in the local node's cluster state
// is when the shard is assigned to the master node, the local node has detected the master as failed
// and a new master has not yet been elected; in this situation the local node will have removed the
// master node from the local cluster state, but the shards assigned to the master will still be in the
// routing table as such
if (shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) {
String nodeId = shard.currentNodeId(); String nodeId = shard.currentNodeId();
if (!nodeIds.containsKey(nodeId)) { if (!nodeIds.containsKey(nodeId)) {
nodeIds.put(nodeId, new ArrayList<>()); nodeIds.put(nodeId, new ArrayList<>());

View File

@ -289,6 +289,44 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
} }
} }
// simulate the master being removed from the cluster but before a new master is elected
// as such, the shards assigned to the master will still show up in the cluster state as assigned to a node but
// that node will not be in the local cluster state on any node that has detected the master as failing
// in this case, such a shard should be treated as unassigned
public void testRequestsAreNotSentToFailedMaster() {
Request request = new Request(new String[]{TEST_INDEX});
PlainActionFuture<Response> listener = new PlainActionFuture<>();
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().getNodes());
builder.remove(masterNode.id());
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
action.new AsyncAction(request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.capturedRequestsByTargetNode();
// the master should not be in the list of nodes that requests were sent to
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
Set<String> set = new HashSet<>();
for (ShardRouting shard : shardIt.asUnordered()) {
if (shard.currentNodeId() != masterNode.id()) {
set.add(shard.currentNodeId());
}
}
// check a request was sent to the right number of nodes
assertEquals(set.size(), capturedRequests.size());
// check requests were sent to the right nodes
assertEquals(set, capturedRequests.keySet());
for (Map.Entry<String, List<CapturingTransport.CapturedRequest>> entry : capturedRequests.entrySet()) {
// check one request was sent to each non-master node
assertEquals(1, entry.getValue().size());
}
}
public void testOperationExecution() throws Exception { public void testOperationExecution() throws Exception {
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
Set<ShardRouting> shards = new HashSet<>(); Set<ShardRouting> shards = new HashSet<>();
@ -340,6 +378,18 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
Request request = new Request(new String[]{TEST_INDEX}); Request request = new Request(new String[]{TEST_INDEX});
PlainActionFuture<Response> listener = new PlainActionFuture<>(); PlainActionFuture<Response> listener = new PlainActionFuture<>();
// simulate removing the master
final boolean simulateFailedMasterNode = rarely();
DiscoveryNode failedMasterNode = null;
if (simulateFailedMasterNode) {
failedMasterNode = clusterService.state().nodes().masterNode();
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().getNodes());
builder.remove(failedMasterNode.id());
builder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
}
action.new AsyncAction(request, listener).start(); action.new AsyncAction(request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.capturedRequestsByTargetNode(); Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.capturedRequestsByTargetNode();
transport.clear(); transport.clear();
@ -382,6 +432,9 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
transport.handleResponse(requestId, nodeResponse); transport.handleResponse(requestId, nodeResponse);
} }
} }
if (simulateFailedMasterNode) {
totalShards += map.get(failedMasterNode.id()).size();
}
Response response = listener.get(); Response response = listener.get();
assertEquals("total shards", totalShards, response.getTotalShards()); assertEquals("total shards", totalShards, response.getTotalShards());