Check if node is still present when collecting attribute shard routings
The node we need to lookup for attribute colelction might not be part of the `DiscoveryNodes` anymore due to node failure or shutdown. This commit adds a check and removes the shard from the iteration. Closes #4589
This commit is contained in:
parent
9f54e9782d
commit
65c4282bb9
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.UnmodifiableIterator;
|
||||
import jsr166y.ThreadLocalRandom;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -430,22 +431,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
if (shardRoutings == null) {
|
||||
synchronized (shardsByAttributeMutex) {
|
||||
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(activeShards);
|
||||
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
|
||||
for (String attribute : key.attributes) {
|
||||
String localAttributeValue = nodes.localNode().attributes().get(attribute);
|
||||
if (localAttributeValue == null) {
|
||||
continue;
|
||||
}
|
||||
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
|
||||
ShardRouting fromShard = iterator.next();
|
||||
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
|
||||
iterator.remove();
|
||||
to.add(fromShard);
|
||||
}
|
||||
}
|
||||
}
|
||||
ImmutableList<ShardRouting> to = collectAttributeShards(key, nodes, from);
|
||||
|
||||
shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
|
||||
shardRoutings = new AttributesRoutings(to, ImmutableList.copyOf(from));
|
||||
activeShardsByAttributes = MapBuilder.newMapBuilder(activeShardsByAttributes).put(key, shardRoutings).immutableMap();
|
||||
}
|
||||
}
|
||||
|
@ -457,28 +445,34 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
if (shardRoutings == null) {
|
||||
synchronized (shardsByAttributeMutex) {
|
||||
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(allInitializingShards);
|
||||
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
|
||||
for (String attribute : key.attributes) {
|
||||
String localAttributeValue = nodes.localNode().attributes().get(attribute);
|
||||
if (localAttributeValue == null) {
|
||||
continue;
|
||||
}
|
||||
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
|
||||
ShardRouting fromShard = iterator.next();
|
||||
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
|
||||
iterator.remove();
|
||||
to.add(fromShard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
|
||||
ImmutableList<ShardRouting> to = collectAttributeShards(key, nodes, from);
|
||||
shardRoutings = new AttributesRoutings(to, ImmutableList.copyOf(from));
|
||||
initializingShardsByAttributes = MapBuilder.newMapBuilder(initializingShardsByAttributes).put(key, shardRoutings).immutableMap();
|
||||
}
|
||||
}
|
||||
return shardRoutings;
|
||||
}
|
||||
|
||||
private static ImmutableList<ShardRouting> collectAttributeShards(AttributesKey key, DiscoveryNodes nodes, ArrayList<ShardRouting> from) {
|
||||
final ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
|
||||
for (final String attribute : key.attributes) {
|
||||
final String localAttributeValue = nodes.localNode().attributes().get(attribute);
|
||||
if (localAttributeValue != null) {
|
||||
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
|
||||
ShardRouting fromShard = iterator.next();
|
||||
final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
|
||||
if (discoveryNode == null) {
|
||||
iterator.remove(); // node is not present anymore - ignore shard
|
||||
} else if (localAttributeValue.equals(discoveryNode.attributes().get(attribute))) {
|
||||
iterator.remove();
|
||||
to.add(fromShard);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ImmutableList.copyOf(to);
|
||||
}
|
||||
|
||||
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
|
||||
return preferAttributesActiveInitializingShardsIt(attributes, nodes, pickIndex());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue