Make cluster state external to o.e.c.a.s.ShardStateAction

This commit modifies the handling of cluster states in
o.e.c.a.s.ShardStateAction so that all necessary state is obtained
externally to the ShardStateAction#shardFailed and
ShardStateAction#shardStarted methods. This refactoring permits the
removal of the ClusterService field from ShardStateAction.
This commit is contained in:
Jason Tedor 2016-01-03 18:18:07 -05:00
parent 6a12b5e59a
commit a70f76f763
4 changed files with 66 additions and 61 deletions

View File

@ -882,7 +882,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
onReplicaFailure(nodeId, exp); onReplicaFailure(nodeId, exp);
} else { } else {
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node); logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
shardStateAction.shardFailed(shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp)); shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
} }
} }
} }

View File

@ -64,7 +64,6 @@ public class ShardStateAction extends AbstractComponent {
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure"; public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
private final TransportService transportService; private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService; private final AllocationService allocationService;
private final RoutingService routingService; private final RoutingService routingService;
@ -72,35 +71,34 @@ public class ShardStateAction extends AbstractComponent {
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) { AllocationService allocationService, RoutingService routingService) {
super(settings); super(settings);
this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
this.allocationService = allocationService; this.allocationService = allocationService;
this.routingService = routingService; this.routingService = routingService;
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler()); transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler()); transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService));
} }
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(shardRouting, indexUUID, message, failure, null, listener); shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
} }
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) { public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode(); innerShardFailed(clusterState, shardRouting, indexUUID, message, failure, timeout, listener);
}
public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("re-sending failed shard [{}], index UUID [{}], reason [{}]", failure, shardRouting, indexUUID, message);
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
}
private void innerShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) { if (masterNode == null) {
logger.warn("can't send shard failed for {}, no master known.", shardRouting); logger.warn("no master known to fail shard [{}]", shardRouting);
listener.onShardFailedNoMaster(); listener.onShardFailedNoMaster();
return; return;
} }
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, timeout, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, null, listener);
}
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, TimeValue timeout, Listener listener) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure); ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY; TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) { if (timeout != null) {
@ -122,6 +120,12 @@ public class ShardStateAction extends AbstractComponent {
} }
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> { private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ClusterService clusterService;
public ShardFailedTransportHandler(ClusterService clusterService) {
this.clusterService = clusterService;
}
@Override @Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request, new ClusterStateTaskListener() { handleShardFailureOnMaster(request, new ClusterStateTaskListener() {
@ -156,6 +160,16 @@ public class ShardStateAction extends AbstractComponent {
} }
); );
} }
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
listener);
}
} }
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> { class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
@ -194,43 +208,46 @@ public class ShardStateAction extends AbstractComponent {
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler(); private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) { public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); DiscoveryNode masterNode = clusterState.nodes().masterNode();
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
listener);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) { if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting); logger.warn("no master known to start shard [{}]", shardRouting);
return; return;
} }
shardStarted(shardRouting, indexUUID, reason, masterNode);
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null); ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry); logger.debug("sending start shard [{}]", shardRoutingEntry);
transportService.sendRequest(masterNode, transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode); logger.warn("failure sending start shard [{}] to [{}]", exp, masterNode, shardRouting);
} }
}); });
} }
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> { class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ClusterService clusterService;
public ShardStartedTransportHandler(ClusterService clusterService) {
this.clusterService = clusterService;
}
@Override @Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardStartedOnMaster(request); handleShardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE); channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
}
} }
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener { class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@ -264,17 +281,6 @@ public class ShardStateAction extends AbstractComponent {
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler(); private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
}
public static class ShardRoutingEntry extends TransportRequest { public static class ShardRoutingEntry extends TransportRequest {
ShardRouting shardRouting; ShardRouting shardRouting;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;

View File

@ -459,7 +459,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId) && shardRouting.started()) { if (!indexService.hasShard(shardId) && shardRouting.started()) {
if (failedShards.containsKey(shardRouting.shardId())) { if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) { if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(), shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER); "master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
} }
} else { } else {
@ -565,9 +565,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexShard.shardId(), indexShard.state(), nodes.masterNode()); indexShard.shardId(), indexShard.state(), nodes.masterNode());
} }
if (nodes.masterNode() != null) { if (nodes.masterNode() != null) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started", "master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
nodes.masterNode());
} }
return; return;
} else { } else {
@ -592,7 +591,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId)) { if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) { if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) { if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(), nodes.masterNode(), shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER); "master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
} }
return; return;
@ -648,7 +647,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
threadPool.generic().execute(() -> { threadPool.generic().execute(() -> {
try { try {
if (indexShard.recoverFromStore(nodes.localNode())) { if (indexShard.recoverFromStore(nodes.localNode())) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store"); shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
} }
} catch (Throwable t) { } catch (Throwable t) {
handleRecoveryFailure(indexService, shardRouting, true, t); handleRecoveryFailure(indexService, shardRouting, true, t);
@ -666,7 +665,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) { if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId); restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository"); shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
} }
} catch (Throwable first) { } catch (Throwable first) {
try { try {
@ -736,7 +735,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override @Override
public void onRecoveryDone(RecoveryState state) { public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]"); shardStateAction.shardStarted(clusterService.state(), shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
} }
@Override @Override
@ -790,7 +789,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try { try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message); logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version())); failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER); shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) { } catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message); logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
} }

View File

@ -98,7 +98,7 @@ public class ShardStateActionTests extends ESTestCase {
AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean noMaster = new AtomicBoolean();
assert !noMaster.get(); assert !noMaster.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override @Override
public void onShardFailedNoMaster() { public void onShardFailedNoMaster() {
noMaster.set(true); noMaster.set(true);
@ -123,7 +123,7 @@ public class ShardStateActionTests extends ESTestCase {
AtomicBoolean failure = new AtomicBoolean(); AtomicBoolean failure = new AtomicBoolean();
assert !failure.get(); assert !failure.get();
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override @Override
public void onShardFailedNoMaster() { public void onShardFailedNoMaster() {
@ -156,7 +156,7 @@ public class ShardStateActionTests extends ESTestCase {
TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() {
@Override @Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) { public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
if (e instanceof ReceiveTimeoutTransportException) { if (e instanceof ReceiveTimeoutTransportException) {