From 36bd8450900072ef260c19a8293406cfe0f7ccab Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 15 Dec 2015 15:10:22 -0500 Subject: [PATCH] Reorganize o/e/c/a/s/ShardStateAction.java This commit is a trivial reorganization of o/e/c/a/s/ShardStateAction.java. The primary motive is have all of the shard failure handling grouped together, and all of the shard started handling grouped together. --- .../action/shard/ShardStateAction.java | 168 +++++++++--------- 1 file changed, 88 insertions(+), 80 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index d09df094a68..c2ac791aa16 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -20,7 +20,11 @@ package org.elasticsearch.cluster.action.shard; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; @@ -37,7 +41,14 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; @@ -45,11 +56,8 @@ import java.util.List; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; -/** - * - */ -public class ShardStateAction extends AbstractComponent { +public class ShardStateAction extends AbstractComponent { public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started"; public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure"; @@ -97,52 +105,26 @@ public class ShardStateAction extends AbstractComponent { options = TransportRequestOptions.builder().withTimeout(timeout).build(); } transportService.sendRequest(masterNode, - SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty response) { - listener.onSuccess(); - } + SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onSuccess(); + } - @Override - public void handleException(TransportException exp) { - logger.warn("failed to send failed shard to {}", exp, masterNode); - listener.onShardFailedFailure(masterNode, exp); - } - }); + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send failed shard to {}", exp, masterNode); + listener.onShardFailedFailure(masterNode, exp); + } + }); } - public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) { - DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); - if (masterNode == null) { - logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting); - return; + private class ShardFailedTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { + handleShardFailureOnMaster(request); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } - shardStarted(shardRouting, indexUUID, reason, masterNode); - } - - public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) { - ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null); - logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); - transportService.sendRequest(masterNode, - SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to send shard started to [{}]", exp, masterNode); - } - - }); - } - - private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); - - private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { - logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); - clusterService.submitStateUpdateTask( - "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", - shardRoutingEntry, - ClusterStateTaskConfig.build(Priority.HIGH), - shardFailedClusterStateHandler, - shardFailedClusterStateHandler); } class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { @@ -168,10 +150,10 @@ public class ShardStateAction extends AbstractComponent { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { - logger.trace("unassigned shards after shard failures. scheduling a reroute."); - routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); - } + if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) { + logger.trace("unassigned shards after shard failures. scheduling a reroute."); + routingService.reroute("unassigned shards after shard failures, scheduling a reroute"); + } } @Override @@ -180,18 +162,45 @@ public class ShardStateAction extends AbstractComponent { } } - private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = - new ShardStartedClusterStateHandler(); - - private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { - logger.debug("received shard started for {}", shardRoutingEntry); + private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) { + logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); clusterService.submitStateUpdateTask( - "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", - shardRoutingEntry, - ClusterStateTaskConfig.build(Priority.URGENT), - shardStartedClusterStateHandler, - shardStartedClusterStateHandler); + "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.HIGH), + shardFailedClusterStateHandler, + shardFailedClusterStateHandler); + } + + public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) { + DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); + if (masterNode == null) { + logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting); + return; + } + shardStarted(shardRouting, indexUUID, reason, masterNode); + } + + public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) { + ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null); + logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); + transportService.sendRequest(masterNode, + SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send shard started to [{}]", exp, masterNode); + } + }); + } + + class ShardStartedTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { + handleShardStartedOnMaster(request); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } } class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { @@ -223,26 +232,20 @@ public class ShardStateAction extends AbstractComponent { } } - private class ShardFailedTransportHandler implements TransportRequestHandler { + private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler(); - @Override - public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - handleShardFailureOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } + private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { + logger.debug("received shard started for {}", shardRoutingEntry); - class ShardStartedTransportHandler implements TransportRequestHandler { - - @Override - public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - shardStartedOnMaster(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } + clusterService.submitStateUpdateTask( + "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.URGENT), + shardStartedClusterStateHandler, + shardStartedClusterStateHandler); } public static class ShardRoutingEntry extends TransportRequest { - ShardRouting shardRouting; String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; String message; @@ -283,8 +286,13 @@ public class ShardStateAction extends AbstractComponent { } public interface Listener { - default void onSuccess() {} - default void onShardFailedNoMaster() {} - default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {} + default void onSuccess() { + } + + default void onShardFailedNoMaster() { + } + + default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) { + } } }