don't go and find local storage for a shard on nodes if its not going to be allocated

This commit is contained in:
kimchy 2010-08-25 15:01:09 +03:00
parent 4bfd0a8c26
commit f36d80f66d
2 changed files with 20 additions and 15 deletions

View File

@ -62,7 +62,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
this.indicesService = indicesService; this.indicesService = indicesService;
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(60)); this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@ -102,6 +102,25 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
continue; continue;
} }
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
for (DiscoveryNode discoNode : nodes.dataNodes().values()) {
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
}
if (!canBeAllocatedToAtLeastOneNode) {
continue;
}
// go and fetch the shard store data for it
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -87,19 +86,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
return new NodeStoreFilesMetaData(); return new NodeStoreFilesMetaData();
} }
/**
* We only need to ask data nodes for shard allocation information.
*/
@Override protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
List<String> dataNodeIds = Lists.newArrayList();
for (String nodeId : nodesIds) {
if (nodes.get(nodeId).dataNode()) {
dataNodeIds.add(nodeId);
}
}
return dataNodeIds.toArray(new String[dataNodeIds.size()]);
}
@Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) { @Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = Lists.newArrayList(); final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = Lists.newArrayList();
final List<FailedNodeException> failures = Lists.newArrayList(); final List<FailedNodeException> failures = Lists.newArrayList();