add a callback for allocation as well

This commit is contained in:
kimchy 2010-08-22 01:00:37 +03:00
parent a3852766ff
commit 7592862646
9 changed files with 49 additions and 9 deletions

View File

@ -28,7 +28,7 @@ import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Maps.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@NotThreadSafe
public class RoutingNodes implements Iterable<RoutingNode> {
@ -41,6 +41,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final List<MutableShardRouting> unassigned = newArrayList();
private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
public RoutingNodes(MetaData metaData, RoutingTable routingTable) {
this.metaData = metaData;
this.routingTable = routingTable;
@ -106,6 +108,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return !unassigned.isEmpty();
}
public List<MutableShardRouting> ignoredUnassigned() {
return this.ignoredUnassigned;
}
public List<MutableShardRouting> unassigned() {
return this.unassigned;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Iterables;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
@ -37,7 +38,7 @@ import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Maps.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@Immutable
public class RoutingTable implements Iterable<IndexRoutingTable> {
@ -232,7 +233,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry));
}
}
for (MutableShardRouting shardRoutingEntry : routingNodes.unassigned()) {
for (MutableShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) {
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -47,5 +48,7 @@ public interface NodeAllocation {
abstract boolean allocate();
}
boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes);
Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -51,6 +52,14 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
for (NodeAllocation allocation : allocations) {
changed |= allocation.allocate(routingNodes, nodes);
}
return changed;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
Decision ret = Decision.YES;
for (NodeAllocation allocation : allocations) {

View File

@ -227,13 +227,22 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
}
if (lastNodeMatched != null) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) {
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we found a match
changed = true;
lastNodeMatched.add(shard);
unassignedIterator.remove();
}
// we found a match
changed = true;
lastNodeMatched.add(shard);
unassignedIterator.remove();
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
/**
* @author kimchy (shay.banon)
*/
// TODO move this to be a NodeAllocation (once we remove the md5 and make listing fast for Unassigned impl)
public interface PreferUnallocatedStrategy {
void prefetch(IndexMetaData index, DiscoveryNodes nodes);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -38,6 +39,10 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent i
super(settings);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
if (shardRouting.primary()) {
return Decision.YES;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -38,6 +39,10 @@ public class SameShardNodeAllocation extends AbstractComponent implements NodeAl
super(settings);
}
@Override public boolean allocate(RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
for (MutableShardRouting current : node.shards()) {
// we do not allow for two shards of the same shard id to exists on the same node

View File

@ -125,6 +125,7 @@ public class ShardsAllocation extends AbstractComponent {
if (preferUnallocatedStrategy != null) {
changed |= preferUnallocatedStrategy.allocateUnassigned(routingNodes, nodes);
}
changed |= nodeAllocations.allocate(routingNodes, nodes);
changed |= allocateUnassigned(routingNodes);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimaries(routingNodes);