add callback for shards_started and shards_failed to better handle caching on node allocations level

This commit is contained in:
kimchy 2010-08-25 22:20:27 +03:00
parent 3ebd03f69a
commit 2910b6ab7f
7 changed files with 55 additions and 27 deletions

View File

@ -26,6 +26,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.List;
/** /**
* A pluggable logic allowing to control if allocation of a shard is allowed on a specific node. * A pluggable logic allowing to control if allocation of a shard is allowed on a specific node.
* *
@ -54,7 +56,14 @@ public abstract class NodeAllocation extends AbstractComponent {
super(settings); super(settings);
} }
public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
}
public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
}
public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false; return false;
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -53,6 +54,18 @@ public class NodeAllocations extends NodeAllocation {
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
} }
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (NodeAllocation allocation : allocations) {
allocation.applyStartedShards(nodeAllocations, routingNodes, nodes, startedShards);
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (NodeAllocation allocation : allocations) {
allocation.applyFailedShards(nodeAllocations, routingNodes, nodes, failedShards);
}
}
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
for (NodeAllocation allocation : allocations) { for (NodeAllocation allocation : allocations) {
if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) { if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) {
@ -62,10 +75,10 @@ public class NodeAllocations extends NodeAllocation {
return true; return true;
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false; boolean changed = false;
for (NodeAllocation allocation : allocations) { for (NodeAllocation allocation : allocations) {
changed |= allocation.allocate(nodeAllocations, routingNodes, nodes); changed |= allocation.allocateUnassigned(nodeAllocations, routingNodes, nodes);
} }
return changed; return changed;
} }

View File

@ -38,7 +38,7 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation {
super(settings); super(settings);
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false; return false;
} }

View File

@ -38,7 +38,7 @@ public class SameShardNodeAllocation extends NodeAllocation {
super(settings); super(settings);
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false; return false;
} }

View File

@ -60,9 +60,11 @@ public class ShardsAllocation extends AbstractComponent {
* *
* <p>If the same instance of the routing table is returned, then no change has been made. * <p>If the same instance of the routing table is returned, then no change has been made.
*/ */
public RoutingTable applyStartedShards(ClusterState clusterState, Iterable<? extends ShardRouting> startedShardEntries) { public RoutingTable applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
RoutingNodes routingNodes = clusterState.routingNodes(); RoutingNodes routingNodes = clusterState.routingNodes();
if (!applyStartedShards(routingNodes, startedShardEntries)) { boolean changed = applyStartedShards(routingNodes, startedShards);
nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards);
if (!changed) {
return clusterState.routingTable(); return clusterState.routingTable();
} }
reroute(routingNodes, clusterState.nodes()); reroute(routingNodes, clusterState.nodes());
@ -74,9 +76,11 @@ public class ShardsAllocation extends AbstractComponent {
* *
* <p>If the same instance of the routing table is returned, then no change has been made. * <p>If the same instance of the routing table is returned, then no change has been made.
*/ */
public RoutingTable applyFailedShards(ClusterState clusterState, Iterable<? extends ShardRouting> failedShardEntries) { public RoutingTable applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
RoutingNodes routingNodes = clusterState.routingNodes(); RoutingNodes routingNodes = clusterState.routingNodes();
if (!applyFailedShards(routingNodes, failedShardEntries)) { boolean changed = applyFailedShards(routingNodes, failedShards);
nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards);
if (!changed) {
return clusterState.routingTable(); return clusterState.routingTable();
} }
// If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards // If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards
@ -113,7 +117,7 @@ public class ShardsAllocation extends AbstractComponent {
// now allocate all the unassigned to available nodes // now allocate all the unassigned to available nodes
if (routingNodes.hasUnassigned()) { if (routingNodes.hasUnassigned()) {
changed |= nodeAllocations.allocate(nodeAllocations, routingNodes, nodes); changed |= nodeAllocations.allocateUnassigned(nodeAllocations, routingNodes, nodes);
changed |= allocateUnassigned(routingNodes); changed |= allocateUnassigned(routingNodes);
// elect primaries again, in case this is needed with unassigned allocation // elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimaries(routingNodes); changed |= electPrimaries(routingNodes);

View File

@ -37,7 +37,7 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1); this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1);
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false; return false;
} }

View File

@ -22,7 +22,10 @@ package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;
@ -42,6 +45,7 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -67,27 +71,25 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (ShardRouting shardRouting : startedShards) {
cachedCommitPoints.remove(shardRouting.shardId());
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (ShardRouting shardRouting : failedShards) {
cachedCommitPoints.remove(shardRouting.shardId());
}
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false; boolean changed = false;
if (nodes.dataNodes().isEmpty()) { if (nodes.dataNodes().isEmpty()) {
return changed; return changed;
} }
// clean cached commit points for primaries that are already active
for (ShardId shardId : cachedCommitPoints.keySet()) {
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
cachedCommitPoints.remove(shardId);
continue;
}
ShardRouting primaryShardRouting = indexRoutingTable.shard(shardId.id()).primaryShard();
if (primaryShardRouting.active()) {
cachedCommitPoints.remove(shardId);
}
}
if (!routingNodes.hasUnassigned()) { if (!routingNodes.hasUnassigned()) {
return changed; return changed;
} }