move the allocate replica after primary active logic to a node allocation

This commit is contained in:
kimchy 2010-08-22 00:14:56 +03:00
parent 5ded04c5cb
commit 3f701365b0
6 changed files with 57 additions and 26 deletions

View File

@ -106,6 +106,7 @@
<w>reparse</w> <w>reparse</w>
<w>retrans</w> <w>retrans</w>
<w>retval</w> <w>retval</w>
<w>routings</w>
<w>rsts</w> <w>rsts</w>
<w>sbuf</w> <w>sbuf</w>
<w>searchable</w> <w>searchable</w>

View File

@ -126,7 +126,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return nodesToShards.get(nodeId); return nodesToShards.get(nodeId);
} }
public MutableShardRouting findPrimaryForReplica(MutableShardRouting shard) { public MutableShardRouting findPrimaryForReplica(ShardRouting shard) {
assert !shard.primary(); assert !shard.primary();
for (RoutingNode routingNode : nodesToShards.values()) { for (RoutingNode routingNode : nodesToShards.values()) {
for (MutableShardRouting shardRouting : routingNode) { for (MutableShardRouting shardRouting : routingNode) {

View File

@ -40,8 +40,9 @@ public class NodeAllocations extends AbstractComponent implements NodeAllocation
public NodeAllocations(Settings settings) { public NodeAllocations(Settings settings) {
this(settings, ImmutableSet.<NodeAllocation>builder() this(settings, ImmutableSet.<NodeAllocation>builder()
.add(new SameShardNodeAllocation(settings) .add(new SameShardNodeAllocation(settings))
).build() .add(new ReplicaAfterPrimaryActiveNodeAllocation(settings))
.build()
); );
} }

View File

@ -105,14 +105,6 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
continue; continue;
} }
if (!shard.primary()) {
// if its a backup, only allocate it if the primary is active
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
}
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet()).actionGet(); TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet()).actionGet();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -152,6 +144,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (!nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) { if (!nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
continue; continue;
} }
// if it is already allocated, we can't assign to it... // if it is already allocated, we can't assign to it...
if (storeFilesMetaData.allocated()) { if (storeFilesMetaData.allocated()) {
continue; continue;

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
/**
* An allocation strategy that only allows for a replica to be allocated when the primary is active.
*
* @author kimchy (shay.banon)
*/
public class ReplicaAfterPrimaryActiveNodeAllocation extends AbstractComponent implements NodeAllocation {
@Inject public ReplicaAfterPrimaryActiveNodeAllocation(Settings settings) {
super(settings);
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
if (shardRouting.primary()) {
return Decision.ALLOWED;
}
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shardRouting);
if (primary == null || !primary.active()) {
return Decision.DISALLOWED;
}
return Decision.ALLOWED;
}
}

View File

@ -239,14 +239,6 @@ public class ShardsAllocation extends AbstractComponent {
while (unassignedIterator.hasNext()) { while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next(); MutableShardRouting shard = unassignedIterator.next();
// if its a replica, only allocate it if the primary is active
if (!shard.primary()) {
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
}
// do the allocation, finding the least "busy" node // do the allocation, finding the least "busy" node
for (int i = 0; i < nodes.size(); i++) { for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode); RoutingNode node = nodes.get(lastNode);
@ -272,13 +264,6 @@ public class ShardsAllocation extends AbstractComponent {
// allocate all the unassigned shards above the average per node. // allocate all the unassigned shards above the average per node.
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) { for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shard = it.next(); MutableShardRouting shard = it.next();
// if its a backup, only allocate it if the primary is active
if (!shard.primary()) {
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shard);
if (primary == null || !primary.active()) {
continue;
}
}
// go over the nodes and try and allocate the remaining ones // go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
if (nodeAllocations.canAllocate(shard, routingNode, routingNodes).allocate()) { if (nodeAllocations.canAllocate(shard, routingNode, routingNodes).allocate()) {