batch failed shards into a single cluster state event

make sure we process as much as possible failed shard events within a single cluster state event callback (similar to what we do with started shards)
This commit is contained in:
Shay Banon 2013-08-28 15:29:08 +02:00
parent b63af53313
commit db11c30dd5
5 changed files with 33 additions and 21 deletions

View File

@ -59,6 +59,7 @@ public class ShardStateAction extends AbstractComponent {
private final ThreadPool threadPool;
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRouting> failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
@ -109,19 +110,17 @@ public class ShardStateAction extends AbstractComponent {
private void innerShardFailed(final ShardRouting shardRouting, final String reason) {
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
failedShardQueue.add(shardRouting);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (logger.isDebugEnabled()) {
logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingAllocation.Result routingResult = allocationService.applyFailedShard(currentState, shardRouting);
List<ShardRouting> shards = new ArrayList<ShardRouting>();
failedShardQueue.drainTo(shards);
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shards);
if (!routingResult.changed()) {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
@ -98,17 +99,24 @@ public class AllocationService extends AbstractComponent {
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, ImmutableList.of(failedShard));
}
/**
* Applies the failed shards. Note, shards can be called several times within this method.
* <p/>
* <p>If the same instance of the routing table is returned, then no change has been made.</p>
*/
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<ShardRouting> failedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Collections.shuffle(routingNodes.unassigned());
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard);
boolean changed = applyFailedShard(allocation, failedShard, true);
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards);
boolean changed = false;
for (ShardRouting failedShard : failedShards) {
changed |= applyFailedShard(allocation, failedShard, true);
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}

View File

@ -24,20 +24,22 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import java.util.List;
/**
* This {@link RoutingAllocation} keeps a shard which routing
* allocation has faild
* allocation has faild
*/
public class FailedRerouteAllocation extends RoutingAllocation {
private final ShardRouting failedShard;
private final List<ShardRouting> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) {
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<ShardRouting> failedShards) {
super(deciders, routingNodes, nodes);
this.failedShard = failedShard;
this.failedShards = failedShards;
}
public ShardRouting failedShard() {
return failedShard;
public List<ShardRouting> failedShards() {
return failedShards;
}
}

View File

@ -88,8 +88,10 @@ public class BlobReuseExistingGatewayAllocator extends AbstractComponent impleme
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
cachedCommitPoints.remove(allocation.failedShard().shardId());
cachedStores.remove(allocation.failedShard().shardId());
for (ShardRouting failedShard : allocation.failedShards()) {
cachedCommitPoints.remove(failedShard.shardId());
cachedStores.remove(failedShard.shardId());
}
}
@Override

View File

@ -95,9 +95,10 @@ public class LocalGatewayAllocator extends AbstractComponent implements GatewayA
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
ShardRouting failedShard = allocation.failedShard();
cachedStores.remove(failedShard.shardId());
cachedShardsState.remove(failedShard.shardId());
for (ShardRouting failedShard : allocation.failedShards()) {
cachedStores.remove(failedShard.shardId());
cachedShardsState.remove(failedShard.shardId());
}
}
@Override