only check for prefered allocation on data nodes

This commit is contained in:
kimchy 2010-08-22 00:32:43 +03:00
parent 3f701365b0
commit a3852766ff
6 changed files with 32 additions and 10 deletions

View File

@ -31,11 +31,15 @@ import org.elasticsearch.cluster.routing.ShardRouting;
public interface NodeAllocation { public interface NodeAllocation {
enum Decision { enum Decision {
ALLOWED { YES {
@Override boolean allocate() { @Override boolean allocate() {
return true; return true;
}}, }},
DISALLOWED { NO {
@Override boolean allocate() {
return false;
}},
THROTTLE {
@Override boolean allocate() { @Override boolean allocate() {
return false; return false;
}}; }};

View File

@ -52,12 +52,15 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
} }
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
Decision ret = Decision.YES;
for (NodeAllocation allocation : allocations) { for (NodeAllocation allocation : allocations) {
Decision decision = allocation.canAllocate(shardRouting, node, routingNodes); Decision decision = allocation.canAllocate(shardRouting, node, routingNodes);
if (decision == Decision.DISALLOWED) { if (decision == Decision.NO) {
return Decision.DISALLOWED; return Decision.NO;
} else if (decision == Decision.THROTTLE) {
ret = Decision.THROTTLE;
} }
} }
return Decision.ALLOWED; return ret;
} }
} }

View File

@ -40,12 +40,12 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent i
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
if (shardRouting.primary()) { if (shardRouting.primary()) {
return Decision.ALLOWED; return Decision.YES;
} }
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shardRouting); MutableShardRouting primary = routingNodes.findPrimaryForReplica(shardRouting);
if (primary == null || !primary.active()) { if (primary == null || !primary.active()) {
return Decision.DISALLOWED; return Decision.NO;
} }
return Decision.ALLOWED; return Decision.YES;
} }
} }

View File

@ -42,9 +42,9 @@ public class SameShardNodeAllocation extends AbstractComponent implements NodeAl
for (MutableShardRouting current : node.shards()) { for (MutableShardRouting current : node.shards()) {
// we do not allow for two shards of the same shard id to exists on the same node // we do not allow for two shards of the same shard id to exists on the same node
if (current.shardId().equals(shardRouting.shardId())) { if (current.shardId().equals(shardRouting.shardId())) {
return Decision.DISALLOWED; return Decision.NO;
} }
} }
return Decision.ALLOWED; return Decision.YES;
} }
} }

View File

@ -33,6 +33,7 @@ public class ShardAllocationModule extends AbstractModule {
Multibinder<NodeAllocation> decidersBinder = Multibinder.newSetBinder(binder(), NodeAllocation.class); Multibinder<NodeAllocation> decidersBinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
decidersBinder.addBinding().to(SameShardNodeAllocation.class); decidersBinder.addBinding().to(SameShardNodeAllocation.class);
decidersBinder.addBinding().to(ReplicaAfterPrimaryActiveNodeAllocation.class);
bind(NodeAllocations.class).asEagerSingleton(); bind(NodeAllocations.class).asEagerSingleton();
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -89,6 +90,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
return new NodeStoreFilesMetaData(); return new NodeStoreFilesMetaData();
} }
/**
* We only need to ask data nodes for shard allocation information.
*/
@Override protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
List<String> dataNodeIds = Lists.newArrayList();
for (String nodeId : nodesIds) {
if (nodes.get(nodeId).dataNode()) {
dataNodeIds.add(nodeId);
}
}
return dataNodeIds.toArray(new String[dataNodeIds.size()]);
}
@Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) { @Override protected NodesStoreFilesMetaData newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = Lists.newArrayList(); final List<NodeStoreFilesMetaData> nodeStoreFilesMetaDatas = Lists.newArrayList();
final List<FailedNodeException> failures = Lists.newArrayList(); final List<FailedNodeException> failures = Lists.newArrayList();