Reorganize o/e/c/a/s/ShardStateAction.java

This commit is a trivial reorganization of
o/e/c/a/s/ShardStateAction.java. The primary motive is have all of the
shard failure handling grouped together, and all of the shard started
handling grouped together.
This commit is contained in:
Jason Tedor 2015-12-15 15:10:22 -05:00
parent ffdda793a0
commit 36bd845090

View File

@ -20,7 +20,11 @@
package org.elasticsearch.cluster.action.shard; package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
@ -37,7 +41,14 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,11 +56,8 @@ import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
/**
*
*/
public class ShardStateAction extends AbstractComponent {
public class ShardStateAction extends AbstractComponent {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started"; public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure"; public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
@ -111,38 +119,12 @@ public class ShardStateAction extends AbstractComponent {
}); });
} }
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) { private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
return;
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override @Override
public void handleException(TransportException exp) { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
logger.warn("failed to send shard started to [{}]", exp, masterNode); handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }
});
}
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
shardFailedClusterStateHandler);
} }
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener { class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@ -180,18 +162,45 @@ public class ShardStateAction extends AbstractComponent {
} }
} }
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
new ShardStartedClusterStateHandler();
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask( clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", "shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry, shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT), ClusterStateTaskConfig.build(Priority.HIGH),
shardStartedClusterStateHandler, shardFailedClusterStateHandler,
shardStartedClusterStateHandler); shardFailedClusterStateHandler);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
return;
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
} }
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener { class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@ -223,26 +232,20 @@ public class ShardStateAction extends AbstractComponent {
} }
} }
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> { private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();
@Override private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { logger.debug("received shard started for {}", shardRoutingEntry);
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> { clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
@Override shardRoutingEntry,
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedOnMaster(request); shardStartedClusterStateHandler,
channel.sendResponse(TransportResponse.Empty.INSTANCE); shardStartedClusterStateHandler);
}
} }
public static class ShardRoutingEntry extends TransportRequest { public static class ShardRoutingEntry extends TransportRequest {
ShardRouting shardRouting; ShardRouting shardRouting;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
String message; String message;
@ -283,8 +286,13 @@ public class ShardStateAction extends AbstractComponent {
} }
public interface Listener { public interface Listener {
default void onSuccess() {} default void onSuccess() {
default void onShardFailedNoMaster() {} }
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
default void onShardFailedNoMaster() {
}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
}
} }
} }