diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java index 6161bd49489..27347cc216c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java @@ -26,6 +26,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import java.util.List; + /** * A pluggable logic allowing to control if allocation of a shard is allowed on a specific node. * @@ -54,7 +56,14 @@ public abstract class NodeAllocation extends AbstractComponent { super(settings); } - public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> startedShards) { + } + + public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> failedShards) { + + } + + public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { return false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index 9a932cec367..4e42daa92a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import java.util.List; import java.util.Set; /** @@ -53,6 +54,18 @@ public class NodeAllocations extends NodeAllocation { this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); } + @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> startedShards) { + for (NodeAllocation allocation : allocations) { + allocation.applyStartedShards(nodeAllocations, routingNodes, nodes, startedShards); + } + } + + @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> failedShards) { + for (NodeAllocation allocation : allocations) { + allocation.applyFailedShards(nodeAllocations, routingNodes, nodes, failedShards); + } + } + @Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) { for (NodeAllocation allocation : allocations) { if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) { @@ -62,10 +75,10 @@ public class NodeAllocations extends NodeAllocation { return true; } - @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { boolean changed = false; for (NodeAllocation allocation : allocations) { - changed |= allocation.allocate(nodeAllocations, routingNodes, nodes); + changed |= allocation.allocateUnassigned(nodeAllocations, routingNodes, nodes); } return changed; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java index 9c795fdb612..0705fce4d47 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ReplicaAfterPrimaryActiveNodeAllocation.java @@ -38,7 +38,7 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation { super(settings); } - @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { return false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java index e5149ce4ba5..12cd65016bc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/SameShardNodeAllocation.java @@ -38,7 +38,7 @@ public class SameShardNodeAllocation extends NodeAllocation { super(settings); } - @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { return false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index 33da9b3a9a3..151300c6536 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -60,9 +60,11 @@ public class ShardsAllocation extends AbstractComponent { * *
If the same instance of the routing table is returned, then no change has been made. */ - public RoutingTable applyStartedShards(ClusterState clusterState, Iterable extends ShardRouting> startedShardEntries) { + public RoutingTable applyStartedShards(ClusterState clusterState, List extends ShardRouting> startedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - if (!applyStartedShards(routingNodes, startedShardEntries)) { + boolean changed = applyStartedShards(routingNodes, startedShards); + nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards); + if (!changed) { return clusterState.routingTable(); } reroute(routingNodes, clusterState.nodes()); @@ -74,9 +76,11 @@ public class ShardsAllocation extends AbstractComponent { * *
If the same instance of the routing table is returned, then no change has been made. */ - public RoutingTable applyFailedShards(ClusterState clusterState, Iterable extends ShardRouting> failedShardEntries) { + public RoutingTable applyFailedShards(ClusterState clusterState, List extends ShardRouting> failedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - if (!applyFailedShards(routingNodes, failedShardEntries)) { + boolean changed = applyFailedShards(routingNodes, failedShards); + nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards); + if (!changed) { return clusterState.routingTable(); } // If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards @@ -113,7 +117,7 @@ public class ShardsAllocation extends AbstractComponent { // now allocate all the unassigned to available nodes if (routingNodes.hasUnassigned()) { - changed |= nodeAllocations.allocate(nodeAllocations, routingNodes, nodes); + changed |= nodeAllocations.allocateUnassigned(nodeAllocations, routingNodes, nodes); changed |= allocateUnassigned(routingNodes); // elect primaries again, in case this is needed with unassigned allocation changed |= electPrimaries(routingNodes); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java index cc746e4a9d3..9522353e16e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ThrottlingNodeAllocation.java @@ -37,7 +37,7 @@ public class ThrottlingNodeAllocation extends NodeAllocation { this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1); } - @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { return false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 06d91ef3a66..d98e0e61ac8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -22,7 +22,10 @@ package org.elasticsearch.gateway.blobstore; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.common.collect.Maps; @@ -42,6 +45,7 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -67,27 +71,25 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); } - @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { + @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> startedShards) { + for (ShardRouting shardRouting : startedShards) { + cachedCommitPoints.remove(shardRouting.shardId()); + } + } + + @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List extends ShardRouting> failedShards) { + for (ShardRouting shardRouting : failedShards) { + cachedCommitPoints.remove(shardRouting.shardId()); + } + } + + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { boolean changed = false; if (nodes.dataNodes().isEmpty()) { return changed; } - // clean cached commit points for primaries that are already active - for (ShardId shardId : cachedCommitPoints.keySet()) { - IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(shardId.index().name()); - if (indexRoutingTable == null) { - cachedCommitPoints.remove(shardId); - continue; - } - - ShardRouting primaryShardRouting = indexRoutingTable.shard(shardId.id()).primaryShard(); - if (primaryShardRouting.active()) { - cachedCommitPoints.remove(shardId); - } - } - if (!routingNodes.hasUnassigned()) { return changed; }