Merge pull request #15456 from jasontedor/reorganize-shard-state-action
Reorganize o/e/c/a/s/ShardStateAction.java
This commit is contained in:
commit
44467df353
|
@ -20,7 +20,11 @@
|
|||
package org.elasticsearch.cluster.action.shard;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
|
@ -37,7 +41,14 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -45,11 +56,8 @@ import java.util.List;
|
|||
|
||||
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ShardStateAction extends AbstractComponent {
|
||||
|
||||
public class ShardStateAction extends AbstractComponent {
|
||||
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
|
||||
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
|
||||
|
||||
|
@ -97,52 +105,26 @@ public class ShardStateAction extends AbstractComponent {
|
|||
options = TransportRequestOptions.builder().withTimeout(timeout).build();
|
||||
}
|
||||
transportService.sendRequest(masterNode,
|
||||
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
listener.onSuccess();
|
||||
}
|
||||
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
listener.onSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to send failed shard to {}", exp, masterNode);
|
||||
listener.onShardFailedFailure(masterNode, exp);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to send failed shard to {}", exp, masterNode);
|
||||
listener.onShardFailedFailure(masterNode, exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
|
||||
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
||||
if (masterNode == null) {
|
||||
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
|
||||
return;
|
||||
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||
@Override
|
||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||
handleShardFailureOnMaster(request);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
shardStarted(shardRouting, indexUUID, reason, masterNode);
|
||||
}
|
||||
|
||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
|
||||
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
||||
transportService.sendRequest(masterNode,
|
||||
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to send shard started to [{}]", exp, masterNode);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
|
||||
|
||||
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
||||
clusterService.submitStateUpdateTask(
|
||||
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
|
||||
shardRoutingEntry,
|
||||
ClusterStateTaskConfig.build(Priority.HIGH),
|
||||
shardFailedClusterStateHandler,
|
||||
shardFailedClusterStateHandler);
|
||||
}
|
||||
|
||||
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
|
||||
|
@ -168,10 +150,10 @@ public class ShardStateAction extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
|
||||
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
|
||||
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
|
||||
}
|
||||
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
|
||||
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
|
||||
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,18 +162,45 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
|
||||
new ShardStartedClusterStateHandler();
|
||||
|
||||
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||
logger.debug("received shard started for {}", shardRoutingEntry);
|
||||
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
|
||||
|
||||
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
||||
clusterService.submitStateUpdateTask(
|
||||
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
|
||||
shardRoutingEntry,
|
||||
ClusterStateTaskConfig.build(Priority.URGENT),
|
||||
shardStartedClusterStateHandler,
|
||||
shardStartedClusterStateHandler);
|
||||
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
|
||||
shardRoutingEntry,
|
||||
ClusterStateTaskConfig.build(Priority.HIGH),
|
||||
shardFailedClusterStateHandler,
|
||||
shardFailedClusterStateHandler);
|
||||
}
|
||||
|
||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
|
||||
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
||||
if (masterNode == null) {
|
||||
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
|
||||
return;
|
||||
}
|
||||
shardStarted(shardRouting, indexUUID, reason, masterNode);
|
||||
}
|
||||
|
||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
|
||||
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
||||
transportService.sendRequest(masterNode,
|
||||
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to send shard started to [{}]", exp, masterNode);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||
@Override
|
||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||
handleShardStartedOnMaster(request);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
|
||||
|
@ -223,26 +232,20 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();
|
||||
|
||||
@Override
|
||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||
handleShardFailureOnMaster(request);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||
logger.debug("received shard started for {}", shardRoutingEntry);
|
||||
|
||||
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||
shardStartedOnMaster(request);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
clusterService.submitStateUpdateTask(
|
||||
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
|
||||
shardRoutingEntry,
|
||||
ClusterStateTaskConfig.build(Priority.URGENT),
|
||||
shardStartedClusterStateHandler,
|
||||
shardStartedClusterStateHandler);
|
||||
}
|
||||
|
||||
public static class ShardRoutingEntry extends TransportRequest {
|
||||
|
||||
ShardRouting shardRouting;
|
||||
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||
String message;
|
||||
|
@ -283,8 +286,13 @@ public class ShardStateAction extends AbstractComponent {
|
|||
}
|
||||
|
||||
public interface Listener {
|
||||
default void onSuccess() {}
|
||||
default void onShardFailedNoMaster() {}
|
||||
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
|
||||
default void onSuccess() {
|
||||
}
|
||||
|
||||
default void onShardFailedNoMaster() {
|
||||
}
|
||||
|
||||
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue