Merge pull request #15016 from jasontedor/shard-failure-batch

Use general cluster state batching mechanism for shard failures
This commit is contained in:
Jason Tedor 2015-12-03 14:02:48 -05:00
commit 182c22f23f
1 changed files with 37 additions and 43 deletions

View File

@ -20,9 +20,7 @@
package org.elasticsearch.cluster.action.shard; package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
@ -46,7 +44,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -64,7 +61,6 @@ public class ShardStateAction extends AbstractComponent {
private final RoutingService routingService; private final RoutingService routingService;
private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue(); private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject @Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
@ -141,44 +137,38 @@ public class ShardStateAction extends AbstractComponent {
}); });
} }
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry); clusterService.submitStateUpdateTask(
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
new ClusterStateUpdateTask(Priority.HIGH) { shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
shardFailedClusterStateHandler);
}
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@Override @Override
public ClusterState execute(ClusterState currentState) { public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
if (shardRoutingEntry.processed) { BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
return currentState; List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) {
task.processed = true;
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
} }
ClusterState maybeUpdatedState = currentState;
List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<>(); try {
failedShardQueue.drainTo(shardRoutingEntries); RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
if (result.changed()) {
// nothing to process (a previous event has processed it already) maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
if (shardRoutingEntries.isEmpty()) {
return currentState;
} }
batchResultBuilder.successes(tasks);
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size()); } catch (Throwable t) {
batchResultBuilder.failures(tasks, t);
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure));
} }
return batchResultBuilder.build(maybeUpdatedState);
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
if (!routingResult.changed()) {
return currentState;
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
} }
@Override @Override
@ -188,7 +178,11 @@ public class ShardStateAction extends AbstractComponent {
routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
} }
} }
});
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
} }
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {