Merge pull request #15736 from jasontedor/shard-state-action-cluster-state-refactoring
Make cluster state external to o.e.c.a.s.ShardStateAction
This commit is contained in:
commit
06851b7224
|
@ -882,7 +882,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
||||||
onReplicaFailure(nodeId, exp);
|
onReplicaFailure(nodeId, exp);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
|
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
|
||||||
shardStateAction.shardFailed(shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
|
shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -58,49 +59,38 @@ import java.util.Locale;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
|
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
|
||||||
|
|
||||||
|
|
||||||
public class ShardStateAction extends AbstractComponent {
|
public class ShardStateAction extends AbstractComponent {
|
||||||
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
|
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
|
||||||
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
|
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
|
||||||
|
|
||||||
private final TransportService transportService;
|
private final TransportService transportService;
|
||||||
private final ClusterService clusterService;
|
|
||||||
private final AllocationService allocationService;
|
|
||||||
private final RoutingService routingService;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||||
AllocationService allocationService, RoutingService routingService) {
|
AllocationService allocationService, RoutingService routingService) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.clusterService = clusterService;
|
|
||||||
this.transportService = transportService;
|
this.transportService = transportService;
|
||||||
this.allocationService = allocationService;
|
|
||||||
this.routingService = routingService;
|
|
||||||
|
|
||||||
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler());
|
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
|
||||||
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler());
|
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
|
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
|
||||||
shardFailed(shardRouting, indexUUID, message, failure, null, listener);
|
shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
|
public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
|
||||||
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
|
||||||
|
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
|
||||||
|
DiscoveryNode masterNode = clusterState.nodes().masterNode();
|
||||||
if (masterNode == null) {
|
if (masterNode == null) {
|
||||||
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
|
logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting);
|
||||||
listener.onShardFailedNoMaster();
|
listener.onShardFailedNoMaster();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, timeout, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
|
|
||||||
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
|
|
||||||
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, null, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, TimeValue timeout, Listener listener) {
|
|
||||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
|
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
|
||||||
TransportRequestOptions options = TransportRequestOptions.EMPTY;
|
TransportRequestOptions options = TransportRequestOptions.EMPTY;
|
||||||
if (timeout != null) {
|
if (timeout != null) {
|
||||||
|
@ -115,33 +105,49 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry);
|
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry);
|
||||||
listener.onShardFailedFailure(masterNode, exp);
|
listener.onShardFailedFailure(masterNode, exp);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||||
|
private final ClusterService clusterService;
|
||||||
|
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
|
||||||
|
private final ESLogger logger;
|
||||||
|
|
||||||
|
public ShardFailedTransportHandler(ClusterService clusterService, ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor, ESLogger logger) {
|
||||||
|
this.clusterService = clusterService;
|
||||||
|
this.shardFailedClusterStateTaskExecutor = shardFailedClusterStateTaskExecutor;
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||||
handleShardFailureOnMaster(request, new ClusterStateTaskListener() {
|
logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request);
|
||||||
|
clusterService.submitStateUpdateTask(
|
||||||
|
"shard-failed (" + request.shardRouting + "), message [" + request.message + "]",
|
||||||
|
request,
|
||||||
|
ClusterStateTaskConfig.build(Priority.HIGH),
|
||||||
|
shardFailedClusterStateTaskExecutor,
|
||||||
|
new ClusterStateTaskListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting);
|
logger.error("{} unexpected failure while failing shard [{}]", t, request.shardRouting.shardId(), request.shardRouting);
|
||||||
try {
|
try {
|
||||||
channel.sendResponse(t);
|
channel.sendResponse(t);
|
||||||
} catch (Throwable channelThrowable) {
|
} catch (Throwable channelThrowable) {
|
||||||
logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting);
|
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), t, request.shardRouting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNoLongerMaster(String source) {
|
public void onNoLongerMaster(String source) {
|
||||||
logger.error("no longer master while failing shard [{}]", request.shardRouting);
|
logger.error("{} no longer master while failing shard [{}]", request.shardRouting.shardId(), request.shardRouting);
|
||||||
try {
|
try {
|
||||||
channel.sendResponse(new NotMasterException(source));
|
channel.sendResponse(new NotMasterException(source));
|
||||||
} catch (Throwable channelThrowable) {
|
} catch (Throwable channelThrowable) {
|
||||||
logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting);
|
logger.warn("{} failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +156,7 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
try {
|
try {
|
||||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||||
} catch (Throwable channelThrowable) {
|
} catch (Throwable channelThrowable) {
|
||||||
logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting);
|
logger.warn("{} failed to send response while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,7 +164,17 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
|
private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry> {
|
||||||
|
private final AllocationService allocationService;
|
||||||
|
private final RoutingService routingService;
|
||||||
|
private final ESLogger logger;
|
||||||
|
|
||||||
|
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
|
||||||
|
this.allocationService = allocationService;
|
||||||
|
this.routingService = routingService;
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
|
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
|
||||||
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
|
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
|
||||||
|
@ -192,48 +208,56 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
|
public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) {
|
||||||
|
DiscoveryNode masterNode = clusterState.nodes().masterNode();
|
||||||
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
|
|
||||||
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
|
||||||
clusterService.submitStateUpdateTask(
|
|
||||||
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
|
|
||||||
shardRoutingEntry,
|
|
||||||
ClusterStateTaskConfig.build(Priority.HIGH),
|
|
||||||
shardFailedClusterStateHandler,
|
|
||||||
listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
|
|
||||||
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
|
||||||
if (masterNode == null) {
|
if (masterNode == null) {
|
||||||
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
|
logger.warn("{} no master known to start shard [{}]", shardRouting.shardId(), shardRouting);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
shardStarted(shardRouting, indexUUID, reason, masterNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
|
||||||
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
|
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
|
||||||
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
logger.debug("sending start shard [{}]", shardRoutingEntry);
|
||||||
transportService.sendRequest(masterNode,
|
transportService.sendRequest(masterNode,
|
||||||
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.warn("failed to send shard started to [{}]", exp, masterNode);
|
logger.warn("{} failure sending start shard [{}] to [{}]", exp, shardRouting.shardId(), masterNode, shardRouting);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||||
|
private final ClusterService clusterService;
|
||||||
|
private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
|
||||||
|
private final ESLogger logger;
|
||||||
|
|
||||||
|
public ShardStartedTransportHandler(ClusterService clusterService, ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor, ESLogger logger) {
|
||||||
|
this.clusterService = clusterService;
|
||||||
|
this.shardStartedClusterStateTaskExecutor = shardStartedClusterStateTaskExecutor;
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
|
||||||
handleShardStartedOnMaster(request);
|
logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request);
|
||||||
|
clusterService.submitStateUpdateTask(
|
||||||
|
"shard-started (" + request.shardRouting + "), reason [" + request.message + "]",
|
||||||
|
request,
|
||||||
|
ClusterStateTaskConfig.build(Priority.URGENT),
|
||||||
|
shardStartedClusterStateTaskExecutor,
|
||||||
|
shardStartedClusterStateTaskExecutor);
|
||||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
|
private static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
|
||||||
|
private final AllocationService allocationService;
|
||||||
|
private final ESLogger logger;
|
||||||
|
|
||||||
|
public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, ESLogger logger) {
|
||||||
|
this.allocationService = allocationService;
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
|
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
|
||||||
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
|
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
|
||||||
|
@ -262,19 +286,6 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();
|
|
||||||
|
|
||||||
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
|
||||||
logger.debug("received shard started for {}", shardRoutingEntry);
|
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask(
|
|
||||||
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
|
|
||||||
shardRoutingEntry,
|
|
||||||
ClusterStateTaskConfig.build(Priority.URGENT),
|
|
||||||
shardStartedClusterStateHandler,
|
|
||||||
shardStartedClusterStateHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ShardRoutingEntry extends TransportRequest {
|
public static class ShardRoutingEntry extends TransportRequest {
|
||||||
ShardRouting shardRouting;
|
ShardRouting shardRouting;
|
||||||
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
||||||
|
|
|
@ -459,7 +459,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
if (!indexService.hasShard(shardId) && shardRouting.started()) {
|
if (!indexService.hasShard(shardId) && shardRouting.started()) {
|
||||||
if (failedShards.containsKey(shardRouting.shardId())) {
|
if (failedShards.containsKey(shardRouting.shardId())) {
|
||||||
if (nodes.masterNode() != null) {
|
if (nodes.masterNode() != null) {
|
||||||
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
|
shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(),
|
||||||
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
|
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -565,9 +565,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
indexShard.shardId(), indexShard.state(), nodes.masterNode());
|
indexShard.shardId(), indexShard.state(), nodes.masterNode());
|
||||||
}
|
}
|
||||||
if (nodes.masterNode() != null) {
|
if (nodes.masterNode() != null) {
|
||||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(),
|
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(),
|
||||||
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
|
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
|
||||||
nodes.masterNode());
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
@ -592,7 +591,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
if (!indexService.hasShard(shardId)) {
|
if (!indexService.hasShard(shardId)) {
|
||||||
if (failedShards.containsKey(shardRouting.shardId())) {
|
if (failedShards.containsKey(shardRouting.shardId())) {
|
||||||
if (nodes.masterNode() != null) {
|
if (nodes.masterNode() != null) {
|
||||||
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(),
|
shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(),
|
||||||
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
|
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -648,7 +647,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
threadPool.generic().execute(() -> {
|
threadPool.generic().execute(() -> {
|
||||||
try {
|
try {
|
||||||
if (indexShard.recoverFromStore(nodes.localNode())) {
|
if (indexShard.recoverFromStore(nodes.localNode())) {
|
||||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
|
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
handleRecoveryFailure(indexService, shardRouting, true, t);
|
handleRecoveryFailure(indexService, shardRouting, true, t);
|
||||||
|
@ -666,7 +665,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
|
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
|
||||||
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
|
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
|
||||||
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
|
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
|
||||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
|
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
|
||||||
}
|
}
|
||||||
} catch (Throwable first) {
|
} catch (Throwable first) {
|
||||||
try {
|
try {
|
||||||
|
@ -736,7 +735,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRecoveryDone(RecoveryState state) {
|
public void onRecoveryDone(RecoveryState state) {
|
||||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
|
shardStateAction.shardStarted(clusterService.state(), shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -790,7 +789,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
try {
|
try {
|
||||||
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
|
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
|
||||||
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
|
||||||
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
|
shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
|
||||||
} catch (Throwable e1) {
|
} catch (Throwable e1) {
|
||||||
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
|
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class ShardStateActionTests extends ESTestCase {
|
||||||
AtomicBoolean noMaster = new AtomicBoolean();
|
AtomicBoolean noMaster = new AtomicBoolean();
|
||||||
assert !noMaster.get();
|
assert !noMaster.get();
|
||||||
|
|
||||||
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
|
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onShardFailedNoMaster() {
|
public void onShardFailedNoMaster() {
|
||||||
noMaster.set(true);
|
noMaster.set(true);
|
||||||
|
@ -123,7 +123,7 @@ public class ShardStateActionTests extends ESTestCase {
|
||||||
AtomicBoolean failure = new AtomicBoolean();
|
AtomicBoolean failure = new AtomicBoolean();
|
||||||
assert !failure.get();
|
assert !failure.get();
|
||||||
|
|
||||||
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
|
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onShardFailedNoMaster() {
|
public void onShardFailedNoMaster() {
|
||||||
|
|
||||||
|
@ -156,7 +156,7 @@ public class ShardStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS);
|
TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS);
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() {
|
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() {
|
||||||
@Override
|
@Override
|
||||||
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
|
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
|
||||||
if (e instanceof ReceiveTimeoutTransportException) {
|
if (e instanceof ReceiveTimeoutTransportException) {
|
||||||
|
|
Loading…
Reference in New Issue