From a70f76f76305e5bdc8161f9836d6850f75442aec Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 3 Jan 2016 18:18:07 -0500 Subject: [PATCH] Make cluster state external to o.e.c.a.s.ShardStateAction This commit modifies the handling of cluster states in o.e.c.a.s.ShardStateAction so that all necessary state is obtained externally to the ShardStateAction#shardFailed and ShardStateAction#shardStarted methods. This refactoring permits the removal of the ClusterService field from ShardStateAction. --- .../TransportReplicationAction.java | 2 +- .../action/shard/ShardStateAction.java | 102 +++++++++--------- .../cluster/IndicesClusterStateService.java | 17 ++- .../action/shard/ShardStateActionTests.java | 6 +- 4 files changed, 66 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d96ec177366..80ac93e981b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -882,7 +882,7 @@ public abstract class TransportReplicationAction { + private final ClusterService clusterService; + + public ShardFailedTransportHandler(ClusterService clusterService) { + this.clusterService = clusterService; + } + @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { handleShardFailureOnMaster(request, new ClusterStateTaskListener() { @@ -156,6 +160,16 @@ public class ShardStateAction extends AbstractComponent { } ); } + + private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) { + logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); + clusterService.submitStateUpdateTask( + "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.HIGH), + shardFailedClusterStateHandler, + listener); + } } class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor { @@ -194,43 +208,46 @@ public class ShardStateAction extends AbstractComponent { private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); - private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) { - logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); - clusterService.submitStateUpdateTask( - "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", - shardRoutingEntry, - ClusterStateTaskConfig.build(Priority.HIGH), - shardFailedClusterStateHandler, - listener); - } - - public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) { - DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); + public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) { + DiscoveryNode masterNode = clusterState.nodes().masterNode(); if (masterNode == null) { - logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting); + logger.warn("no master known to start shard [{}]", shardRouting); return; } - shardStarted(shardRouting, indexUUID, reason, masterNode); - } - - public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) { ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null); - logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); + logger.debug("sending start shard [{}]", shardRoutingEntry); transportService.sendRequest(masterNode, SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { - logger.warn("failed to send shard started to [{}]", exp, masterNode); + logger.warn("failure sending start shard [{}] to [{}]", exp, masterNode, shardRouting); } }); } class ShardStartedTransportHandler implements TransportRequestHandler { + private final ClusterService clusterService; + + public ShardStartedTransportHandler(ClusterService clusterService) { + this.clusterService = clusterService; + } + @Override public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { handleShardStartedOnMaster(request); channel.sendResponse(TransportResponse.Empty.INSTANCE); } + + private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { + logger.debug("received shard started for {}", shardRoutingEntry); + + clusterService.submitStateUpdateTask( + "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.URGENT), + shardStartedClusterStateHandler, + shardStartedClusterStateHandler); + } } class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { @@ -264,17 +281,6 @@ public class ShardStateAction extends AbstractComponent { private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler(); - private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { - logger.debug("received shard started for {}", shardRoutingEntry); - - clusterService.submitStateUpdateTask( - "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", - shardRoutingEntry, - ClusterStateTaskConfig.build(Priority.URGENT), - shardStartedClusterStateHandler, - shardStartedClusterStateHandler); - } - public static class ShardRoutingEntry extends TransportRequest { ShardRouting shardRouting; String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 099b7f862cd..8a213898b6c 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -459,7 +459,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { try { if (indexShard.recoverFromStore(nodes.localNode())) { - shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store"); + shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store"); } } catch (Throwable t) { handleRecoveryFailure(indexService, shardRouting, true, t); @@ -666,7 +665,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent