Merge pull request #15468 from jasontedor/master-side-of-wait-on-shard-failures

Master should wait on cluster state publication when failing a shard
This commit is contained in:
Jason Tedor 2015-12-16 10:39:38 -05:00
commit 3e8768f9ee
1 changed files with 51 additions and 22 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
@ -53,6 +54,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -113,7 +115,7 @@ public class ShardStateAction extends AbstractComponent {
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode); logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry);
listener.onShardFailedFailure(masterNode, exp); listener.onShardFailedFailure(masterNode, exp);
} }
}); });
@ -122,22 +124,62 @@ public class ShardStateAction extends AbstractComponent {
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> { private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@Override @Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request); handleShardFailureOnMaster(request, new ClusterStateTaskListener() {
channel.sendResponse(TransportResponse.Empty.INSTANCE); @Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting);
try {
channel.sendResponse(t);
} catch (Throwable channelThrowable) {
logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting);
} }
} }
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener { @Override
public void onNoLongerMaster(String source) {
logger.error("no longer master while failing shard [{}]", request.shardRouting);
try {
channel.sendResponse(new NotMasterException(source));
} catch (Throwable channelThrowable) {
logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting);
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
int numberOfUnassignedShards = newState.getRoutingNodes().unassigned().size();
if (oldState != newState && numberOfUnassignedShards > 0) {
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shard [%s]", numberOfUnassignedShards, request.shardRouting);
if (logger.isTraceEnabled()) {
logger.trace(reason + ", scheduling a reroute");
}
routingService.reroute(reason);
}
} finally {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable channelThrowable) {
logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting);
}
}
}
}
);
}
}
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
@Override @Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception { public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder(); BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) { for (ShardRoutingEntry task : tasks) {
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
} }
ClusterState maybeUpdatedState = currentState; ClusterState maybeUpdatedState = currentState;
try { try {
RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied); RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards);
if (result.changed()) { if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
} }
@ -147,31 +189,18 @@ public class ShardStateAction extends AbstractComponent {
} }
return batchResultBuilder.build(maybeUpdatedState); return batchResultBuilder.build(maybeUpdatedState);
} }
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
} }
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask( clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry, shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH), ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler, shardFailedClusterStateHandler,
shardFailedClusterStateHandler); listener);
} }
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) { public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {