Add handling of channel failures when starting a shard

This commit adds handling of channel failures when starting a shard to
o.e.c.a.s.ShardStateAction. This means that shard started requests
that timeout or occur when there is no master or the master leaves
after the request is sent will now be retried from here. The listener
for a shard state request will now only be notified upon successful
completion of the shard state request, or when a catastrophic
non-channel failure occurs.

This commit also refactors the handling of shard failure requests so
that the two shard state actions of shard failure and shard started
now share the same channel-retry and notification logic.
This commit is contained in:
Jason Tedor 2016-01-14 11:30:03 -05:00
parent 6ca79095df
commit ac237b269c
5 changed files with 45 additions and 55 deletions

View File

@ -889,7 +889,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}

View File

@ -84,25 +84,14 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardFailed(observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardRoutingEntry shardRoutingEntry, final Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
logger.warn("{} no master known for action [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
actionName, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
@ -111,10 +100,10 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(exp);
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp.getCause());
}
}
});
@ -131,21 +120,32 @@ public class ShardStateAction extends AbstractComponent {
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (logger.isTraceEnabled()) {
logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry);
}
sendShardFailed(observer, shardRoutingEntry, listener);
sendShardAction(actionName, observer, shardRoutingEntry, listener);
}
@Override
public void onClusterServiceClose() {
logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting());
listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode()));
logger.warn("{} node closed while execution action [{}] for shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), actionName, shardRoutingEntry.getShardRouting());
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
@ -253,21 +253,10 @@ public class ShardStateAction extends AbstractComponent {
}
}
public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to start shard [{}]", shardRouting.shardId(), shardRouting);
return;
}
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("sending start shard [{}]", shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("{} failure sending start shard [{}] to [{}]", exp, shardRouting.shardId(), masterNode, shardRouting);
}
});
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String message, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@ -392,9 +381,9 @@ public class ShardStateAction extends AbstractComponent {
* Any other exception is communicated to the requester via
* this notification.
*
* @param e the unexpected cause of the failure on the master
* @param t the unexpected cause of the failure on the master
*/
default void onShardFailedFailure(final Exception e) {
default void onFailure(final Throwable t) {
}
}

View File

@ -563,8 +563,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexShard.shardId(), indexShard.state(), nodes.masterNode());
}
if (nodes.masterNode() != null) {
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER);
}
return;
} else {
@ -645,7 +646,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
threadPool.generic().execute(() -> {
try {
if (indexShard.recoverFromStore(nodes.localNode())) {
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable t) {
handleRecoveryFailure(indexService, shardRouting, true, t);
@ -663,7 +664,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable first) {
try {
@ -733,7 +734,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(clusterService.state(), shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]", SHARD_STATE_ACTION_LISTENER);
}
@Override

View File

@ -88,9 +88,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
super.waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
onAfterWaitForNewMasterAndRetry.run();
}
}
@ -145,7 +145,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
latch.countDown();
assert false;
@ -193,7 +193,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable e) {
success.set(false);
latch.countDown();
assert false;
@ -216,7 +216,7 @@ public class ShardStateActionTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
AtomicReference<Exception> exception = new AtomicReference<>();
AtomicReference<Throwable> throwable = new AtomicReference<>();
LongConsumer retryLoop = requestId -> {
if (randomBoolean()) {
@ -243,9 +243,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
exception.set(e);
throwable.set(t);
latch.countDown();
assert false;
}
@ -258,7 +258,7 @@ public class ShardStateActionTests extends ESTestCase {
retryLoop.accept(capturedRequests[0].requestId);
latch.await();
assertNull(exception.get());
assertNull(throwable.get());
assertThat(retries.get(), equalTo(numberOfRetries));
assertTrue(success.get());
}
@ -280,7 +280,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
failure.set(true);
}
});

View File

@ -922,7 +922,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
latch.countDown();
assert false;