Instead of relying on the in-flight number use a boolean flag on the

allocation to ensure that we skip rebalance if we have not processed
all pending shard / store fetches.
This commit is contained in:
Simon Willnauer 2015-11-08 12:40:05 +01:00
parent 01ca95a648
commit fa6a2932f5
6 changed files with 72 additions and 12 deletions

View File

@ -118,6 +118,9 @@ public class RoutingAllocation {
private boolean debugDecision = false;
private boolean hasPendingAsyncFetch = false;
/**
* Creates a new {@link RoutingAllocation}
*
@ -246,4 +249,20 @@ public class RoutingAllocation {
return decision;
}
}
/**
* Returns <code>true</code> iff the current allocation run has not processed all of the in-flight or available
* shard or store fetches. Otherwise <code>true</code>
*/
public boolean hasPendingAsyncFetch() {
return hasPendingAsyncFetch;
}
/**
* Sets a flag that signals that current allocation run has not processed all of the in-flight or available shard or store fetches.
* This state is anti-viral and can be reset in on allocation run.
*/
public void setHasPendingAsyncFetch() {
this.hasPendingAsyncFetch = true;
}
}

View File

@ -118,7 +118,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return rebalance(allocation);
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.allocateUnassigned();
}
@Override
@ -313,6 +314,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return delta <= (threshold + 0.001f);
}
/**
* Allocates all possible unassigned shards
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
final boolean allocateUnassigned() {
return balance(true);
}
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
@ -328,16 +338,24 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* changed, otherwise <code>false</code>
*/
public boolean balance() {
return balance(false);
}
private boolean balance(boolean onlyAssign) {
if (this.nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Start balancing cluster");
if (onlyAssign) {
logger.trace("Start balancing cluster");
} else {
logger.trace("Start assigning unassigned shards");
}
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
boolean changed = initialize(routingNodes, unassigned);
if (!changed && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {

View File

@ -76,8 +76,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
@Override
public boolean rebalance(RoutingAllocation allocation) {
final int numberOfInFlightFetch = gatewayAllocator.getNumberOfInFlightFetch();
if (numberOfInFlightFetch == 0) {
if (allocation.hasPendingAsyncFetch() == false) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
@ -87,7 +86,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
*/
return allocator.rebalance(allocation);
} else {
logger.debug("skip rebalance [{}] shard store fetch operations are still in-flight", numberOfInFlightFetch);
logger.debug("skip rebalance more that on shard/store fetch operations is still in-flight");
}
return false;
}

View File

@ -65,6 +65,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.removeAndIgnore();
continue;
}

View File

@ -139,6 +139,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.removeAndIgnore();
continue; // still fetching
}
@ -186,6 +187,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
changed = true;
allocation.setHasPendingAsyncFetch();
unassignedIterator.removeAndIgnore();
}
}

View File

@ -27,11 +27,13 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
@ -627,21 +629,26 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
}
public void testRebalanceWhileShardFetching() {
final AtomicInteger numFetch = new AtomicInteger(1);
final AtomicBoolean hasFetches = new AtomicBoolean(true);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() {
@Override
public int getNumberOfInFlightFetch() {
return numFetch.get();
public boolean allocateUnassigned(RoutingAllocation allocation) {
if (hasFetches.get()) {
allocation.setHasPendingAsyncFetch();
}
return super.allocateUnassigned(allocation);
}
});
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_id", "node1,node2")).numberOfShards(2).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test1"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
@ -670,14 +677,24 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
numFetch.set(0);
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
hasFetches.set(false);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
@ -689,6 +706,10 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
numRelocating++;
}
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);