Use general cluster state batching mechanism for shard failures

This commit modifies the handling of shard failure cluster state updates
to use the general cluster state batching mechanism. An advantage of
this approach is we now get correct per-listener notification on
failures.
This commit is contained in:
Jason Tedor 2015-11-25 08:55:23 -05:00
parent 6f2c36dcb7
commit d7f4dd0767
2 changed files with 42 additions and 44 deletions

View File

@ -20,9 +20,7 @@
package org.elasticsearch.cluster.action.shard; package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
@ -46,7 +44,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -64,7 +61,6 @@ public class ShardStateAction extends AbstractComponent {
private final RoutingService routingService; private final RoutingService routingService;
private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue(); private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject @Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
@ -141,54 +137,52 @@ public class ShardStateAction extends AbstractComponent {
}); });
} }
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry); clusterService.submitStateUpdateTask(
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
new ClusterStateUpdateTask(Priority.HIGH) { shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
shardFailedClusterStateHandler);
}
@Override class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
public ClusterState execute(ClusterState currentState) { @Override
if (shardRoutingEntry.processed) { public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
return currentState; BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
ClusterState accumulator = ClusterState.builder(currentState).build();
for (ShardRoutingEntry task : tasks) {
task.processed = true;
try {
RoutingAllocation.Result result = allocationService.applyFailedShard(
currentState,
new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
builder.success(task);
if (result.changed()) {
accumulator = ClusterState.builder(accumulator).routingResult(result).build();
}
} catch (Throwable t) {
builder.failure(task, t);
} }
List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<>();
failedShardQueue.drainTo(shardRoutingEntries);
// nothing to process (a previous event has processed it already)
if (shardRoutingEntries.isEmpty()) {
return currentState;
}
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure));
}
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
if (!routingResult.changed()) {
return currentState;
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
} }
return builder.build(accumulator);
}
@Override @Override
public void onFailure(String source, Throwable t) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
logger.trace("unassigned shards after shard failures. scheduling a reroute."); logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
} }
} }
});
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
} }
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {

View File

@ -98,7 +98,11 @@ public class AllocationService extends AbstractComponent {
} }
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null))); return applyFailedShard(clusterState, new FailedRerouteAllocation.FailedShard(failedShard, null, null));
}
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, FailedRerouteAllocation.FailedShard failedShard) {
return applyFailedShards(clusterState, Collections.singletonList(failedShard));
} }
/** /**