Merge pull request #14591 from s1monw/issues/14387

Only allow rebalance operations to run if all shard store data is available
This commit is contained in:
Simon Willnauer 2015-11-10 12:06:31 +01:00
commit 17913c56fb
10 changed files with 197 additions and 9 deletions

View File

@ -77,7 +77,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
} }
String indexName = recoveryState.getShardId().getIndex(); String indexName = recoveryState.getShardId().getIndex();
if (!shardResponses.containsKey(indexName)) { if (!shardResponses.containsKey(indexName)) {
shardResponses.put(indexName, new ArrayList<RecoveryState>()); shardResponses.put(indexName, new ArrayList<>());
} }
if (request.activeOnly()) { if (request.activeOnly()) {
if (recoveryState.getStage() != RecoveryState.Stage.DONE) { if (recoveryState.getStage() != RecoveryState.Stage.DONE) {

View File

@ -118,6 +118,9 @@ public class RoutingAllocation {
private boolean debugDecision = false; private boolean debugDecision = false;
private boolean hasPendingAsyncFetch = false;
/** /**
* Creates a new {@link RoutingAllocation} * Creates a new {@link RoutingAllocation}
* *
@ -246,4 +249,20 @@ public class RoutingAllocation {
return decision; return decision;
} }
} }
/**
* Returns <code>true</code> iff the current allocation run has not processed all of the in-flight or available
* shard or store fetches. Otherwise <code>true</code>
*/
public boolean hasPendingAsyncFetch() {
return hasPendingAsyncFetch;
}
/**
* Sets a flag that signals that current allocation run has not processed all of the in-flight or available shard or store fetches.
* This state is anti-viral and can be reset in on allocation run.
*/
public void setHasPendingAsyncFetch() {
this.hasPendingAsyncFetch = true;
}
} }

View File

@ -118,7 +118,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
@Override @Override
public boolean allocateUnassigned(RoutingAllocation allocation) { public boolean allocateUnassigned(RoutingAllocation allocation) {
return rebalance(allocation); final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.allocateUnassigned();
} }
@Override @Override
@ -313,6 +314,15 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return delta <= (threshold + 0.001f); return delta <= (threshold + 0.001f);
} }
/**
* Allocates all possible unassigned shards
* @return <code>true</code> if the current configuration has been
* changed, otherwise <code>false</code>
*/
final boolean allocateUnassigned() {
return balance(true);
}
/** /**
* Balances the nodes on the cluster model according to the weight * Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the * function. The configured threshold is the minimum delta between the
@ -328,16 +338,24 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* changed, otherwise <code>false</code> * changed, otherwise <code>false</code>
*/ */
public boolean balance() { public boolean balance() {
return balance(false);
}
private boolean balance(boolean onlyAssign) {
if (this.nodes.isEmpty()) { if (this.nodes.isEmpty()) {
/* with no nodes this is pointless */ /* with no nodes this is pointless */
return false; return false;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
if (onlyAssign) {
logger.trace("Start balancing cluster"); logger.trace("Start balancing cluster");
} else {
logger.trace("Start assigning unassigned shards");
}
} }
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin(); final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
boolean changed = initialize(routingNodes, unassigned); boolean changed = initialize(routingNodes, unassigned);
if (!changed && allocation.deciders().canRebalance(allocation).type() == Type.YES) { if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter(); NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */ if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) { for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {

View File

@ -76,7 +76,19 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
@Override @Override
public boolean rebalance(RoutingAllocation allocation) { public boolean rebalance(RoutingAllocation allocation) {
if (allocation.hasPendingAsyncFetch() == false) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
return allocator.rebalance(allocation); return allocator.rebalance(allocation);
} else {
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return false;
}
} }
@Override @Override

View File

@ -65,6 +65,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation); AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
if (shardState.hasData() == false) { if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard); logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.removeAndIgnore(); unassignedIterator.removeAndIgnore();
continue; continue;
} }

View File

@ -139,6 +139,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation); AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
if (shardStores.hasData() == false) { if (shardStores.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard stores", shard); logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
allocation.setHasPendingAsyncFetch();
unassignedIterator.removeAndIgnore(); unassignedIterator.removeAndIgnore();
continue; // still fetching continue; // still fetching
} }

View File

@ -27,13 +27,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -624,4 +627,93 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
assertThat(routingNodes.node("node3").isEmpty(), equalTo(true)); assertThat(routingNodes.node("node3").isEmpty(), equalTo(true));
} }
public void testRebalanceWhileShardFetching() {
final AtomicBoolean hasFetches = new AtomicBoolean(true);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString()).build(), new NoopGatewayAllocator() {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
if (hasFetches.get()) {
allocation.setHasPendingAsyncFetch();
}
return super.allocateUnassigned(allocation);
}
});
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_id", "node1,node2")).numberOfShards(2).numberOfReplicas(0))
.build();
// we use a second index here (test1) that never gets assigned otherwise allocateUnassinged is never called if we don't have unassigned shards.
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test1"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}
logger.debug("start all the primary shards for test");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
logger.debug("now, start 1 more node, check that rebalancing will not happen since we have shard sync going on");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();
logger.debug("reroute and check that nothing has changed");
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
logger.debug("now set hasFetches to true and reroute we should now see exactly one relocating shard");
hasFetches.set(false);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) {
numStarted++;
} else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) {
numRelocating++;
}
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);
}
} }

View File

@ -21,10 +21,16 @@ package org.elasticsearch.recovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -124,4 +130,36 @@ public class FullRollingRestartIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000l); assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000l);
} }
} }
public void testNoRebalanceOnRollingRestart() throws Exception {
// see https://github.com/elastic/elasticsearch/issues/14387
internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodesAsync(3).get();
/**
* We start 3 nodes and a dedicated master. Restart on of the data-nodes and ensure that we got no relocations.
* Yet we have 6 shards 0 replica so that means if the restarting node comes back both other nodes are subject
* to relocating to the restarting node since all had 2 shards and now one node has nothing allocated.
* We have a fix for this to wait until we have allocated unallocated shards now so this shouldn't happen.
*/
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "6").put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0").put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueMinutes(1))).get();
for (int i = 0; i < 100; i++) {
client().prepareIndex("test", "type1", Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map()).execute().actionGet();
}
ensureGreen();
ClusterState state = client().admin().cluster().prepareState().get().getState();
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION);
}
internalCluster().restartRandomDataNode();
ensureGreen();
ClusterState afterState = client().admin().cluster().prepareState().get().getState();
recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION);
}
}
} }

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.test.gateway.NoopGatewayAllocator;
@ -79,6 +80,12 @@ public abstract class ESAllocationTestCase extends ESTestCase {
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
} }
public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new AllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
}
public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) { public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {

View File

@ -32,7 +32,7 @@ public class NoopGatewayAllocator extends GatewayAllocator {
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator(); public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
private NoopGatewayAllocator() { protected NoopGatewayAllocator() {
super(Settings.EMPTY, null, null); super(Settings.EMPTY, null, null);
} }