Merge pull request #16041 from jasontedor/shard-started-channel-failures

Add handling of channel failures when starting a shard
This commit is contained in:
Jason Tedor 2016-01-19 07:05:45 -05:00
commit 089a7a321c
5 changed files with 46 additions and 56 deletions

View File

@ -889,7 +889,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}

View File

@ -50,7 +50,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -84,25 +83,15 @@ public class ShardStateAction extends AbstractComponent {
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardFailed(observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardRoutingEntry shardRoutingEntry, final Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
logger.warn("{} no master known for action [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.debug("{} sending [{}] for shard [{}]", shardRoutingEntry.getShardRouting().getId(), actionName, shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
actionName, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
@ -111,10 +100,10 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(exp);
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp.getCause());
}
}
});
@ -131,21 +120,32 @@ public class ShardStateAction extends AbstractComponent {
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
if (logger.isTraceEnabled()) {
logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), state.prettyPrint(), shardRoutingEntry);
}
sendShardFailed(observer, shardRoutingEntry, listener);
sendShardAction(actionName, observer, shardRoutingEntry, listener);
}
@Override
public void onClusterServiceClose() {
logger.warn("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting());
listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode()));
logger.warn("{} node closed while execution action [{}] for shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), actionName, shardRoutingEntry.getShardRouting());
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
@ -253,21 +253,10 @@ public class ShardStateAction extends AbstractComponent {
}
}
public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to start shard [{}]", shardRouting.shardId(), shardRouting);
return;
}
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("sending start shard [{}]", shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("{} failure sending start shard [{}] to [{}]", exp, shardRouting.shardId(), masterNode, shardRouting);
}
});
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String message, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@ -392,9 +381,9 @@ public class ShardStateAction extends AbstractComponent {
* Any other exception is communicated to the requester via
* this notification.
*
* @param e the unexpected cause of the failure on the master
* @param t the unexpected cause of the failure on the master
*/
default void onShardFailedFailure(final Exception e) {
default void onFailure(final Throwable t) {
}
}

View File

@ -563,8 +563,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
indexShard.shardId(), indexShard.state(), nodes.masterNode());
}
if (nodes.masterNode() != null) {
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER);
}
return;
} else {
@ -645,7 +646,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
threadPool.generic().execute(() -> {
try {
if (indexShard.recoverFromStore(nodes.localNode())) {
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable t) {
handleRecoveryFailure(indexService, shardRouting, true, t);
@ -663,7 +664,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository());
if (indexShard.restoreFromRepository(indexShardRepository, nodes.localNode())) {
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), sId);
shardStateAction.shardStarted(state, shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from repository", SHARD_STATE_ACTION_LISTENER);
}
} catch (Throwable first) {
try {
@ -733,7 +734,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(clusterService.state(), shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]");
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery (replica) from node [" + state.getSourceNode() + "]", SHARD_STATE_ACTION_LISTENER);
}
@Override

View File

@ -88,9 +88,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
super.waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
onAfterWaitForNewMasterAndRetry.run();
}
}
@ -145,7 +145,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
latch.countDown();
assert false;
@ -193,7 +193,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable e) {
success.set(false);
latch.countDown();
assert false;
@ -216,7 +216,7 @@ public class ShardStateActionTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
AtomicReference<Exception> exception = new AtomicReference<>();
AtomicReference<Throwable> throwable = new AtomicReference<>();
LongConsumer retryLoop = requestId -> {
if (randomBoolean()) {
@ -243,9 +243,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
exception.set(e);
throwable.set(t);
latch.countDown();
assert false;
}
@ -258,7 +258,7 @@ public class ShardStateActionTests extends ESTestCase {
retryLoop.accept(capturedRequests[0].requestId);
latch.await();
assertNull(exception.get());
assertNull(throwable.get());
assertThat(retries.get(), equalTo(numberOfRetries));
assertTrue(success.get());
}
@ -280,7 +280,7 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
failure.set(true);
}
});

View File

@ -922,7 +922,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
@Override
public void onShardFailedFailure(Exception e) {
public void onFailure(Throwable t) {
success.set(false);
latch.countDown();
assert false;