Centrally handle channel failures when failing a shard

This commit moves the handling of channel failures when failing a shard
to o.e.c.a.s.ShardStateAction. This means that shard failure requests
that timeout or occur when there is no master or the master leaves after
the request is sent will now be retried from here. The listener for a
shard failed request will now only be notified upon successful
completion of the shard failed request, or when a catastrophic
non-channel failure occurs.
This commit is contained in:
Jason Tedor 2016-01-08 16:39:14 -05:00
parent 58c2a3b687
commit f49435c78b
4 changed files with 234 additions and 191 deletions

View File

@ -31,8 +31,6 @@ import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -44,7 +42,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
@ -67,7 +64,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
@ -886,26 +882,31 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (ignoreReplicaException(exp)) {
onReplicaFailure(nodeId, exp);
} else {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
ReplicationFailedShardStateListener listener = new ReplicationFailedShardStateListener(observer, shard, exp, message, nodeId);
shardFailed(observer.observedState(), shard, exp, message, listener);
}
}
}
);
}
private void shardFailed(ClusterState clusterState, ShardRouting shard, TransportException exp, String message, ShardStateAction.Listener listener) {
logger.warn("{} {}", exp, shardId, message);
shardStateAction.shardFailed(
clusterState,
shard,
indexUUID,
message,
exp,
shardFailedTimeout,
listener);
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onReplicaFailure(nodeId, exp);
}
@Override
public void onShardFailedFailure(Exception e) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
}
}
);
}
}
}
);
}
void onReplicaFailure(String nodeId, @Nullable Throwable e) {
@ -972,90 +973,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
}
}
public class ReplicationFailedShardStateListener implements ShardStateAction.Listener {
private final ClusterStateObserver observer;
private final ShardRouting shard;
private final TransportException exp;
private final String message;
private final String nodeId;
public ReplicationFailedShardStateListener(
ClusterStateObserver observer, ShardRouting shard, TransportException exp,
String message,
String nodeId) {
this.observer = observer;
this.shard = shard;
this.exp = exp;
this.message = message;
this.nodeId = nodeId;
}
@Override
public void onSuccess() {
// TODO: validate the cluster state and retry?
onReplicaFailure(nodeId, exp);
}
@Override
public void onShardFailedNoMaster() {
waitForNewMasterAndRetry();
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.trace("timeout sending shard failure to master [{}]", e, master);
// TODO: recheck the cluster state and retry indefinitely?
onReplicaFailure(nodeId, exp);
} else if (e.getCause() instanceof NotMasterException) {
waitForNewMasterAndRetry();
}
}
private void waitForNewMasterAndRetry() {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
retry(state);
}
@Override
public void onClusterServiceClose() {
logger.error("{} node closed while handling failed shard [{}]", exp, shard.shardId(), shard);
forceFinishAsFailed(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
// we wait indefinitely for a new master
assert false;
}
}, MasterNodeChangePredicate.INSTANCE);
}
private void retry(ClusterState clusterState) {
if (!isFailed(shard, clusterState)) {
shardFailed(clusterState, shard, exp, message, this);
} else {
// the shard has already been failed, so just signal replica failure
onReplicaFailure(nodeId, exp);
}
}
private boolean isFailed(ShardRouting shardRouting, ClusterState clusterState) {
// verify that the shard we requested to fail is no longer in the cluster state
RoutingNode routingNode = clusterState.getRoutingNodes().node(shardRouting.currentNodeId());
if (routingNode == null) {
// the node left
return true;
} else {
// the same shard is gone
ShardRouting sr = routingNode.get(shardRouting.getId());
return sr == null || !sr.isSameAllocation(shardRouting);
}
}
}
}
/**

View File

@ -22,9 +22,11 @@ package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -42,8 +44,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -64,38 +68,44 @@ public class ShardStateAction extends AbstractComponent {
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
private final TransportService transportService;
private final ClusterService clusterService;
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(shardRouting, indexUUID, message, failure, null, listener);
}
public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting);
listener.onShardFailedNoMaster();
return;
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) {
options = TransportRequestOptions.builder().withTimeout(timeout).build();
}
sendShardFailed(observer, shardRoutingEntry, options, listener);
}
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
@ -105,11 +115,48 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(masterNode, exp);
if (exp instanceof ReceiveTimeoutTransportException) {
logger.trace("{} timeout sending shard failure [{}] to master [{}]", exp, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.failure, masterNode);
handleTimeout(shardRoutingEntry, observer, options, listener);
} else if (exp.getCause() instanceof NotMasterException) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(exp);
}
}
});
}
}
// visible for testing
protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) {
// set the observed state to the latest cluster state
observer.reset(clusterService.state());
sendShardFailed(observer, shardRoutingEntry, options, listener);
}
// visible for testing
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
sendShardFailed(observer, shardRoutingEntry, options, listener);
}
@Override
public void onClusterServiceClose() {
logger.error("{} node closed while handling failed shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.getShardRouting());
listener.onShardFailedFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
// we wait indefinitely for a new master
assert false;
}
}, MasterNodeChangePredicate.INSTANCE);
}
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ClusterService clusterService;
@ -334,10 +381,7 @@ public class ShardStateAction extends AbstractComponent {
default void onSuccess() {
}
default void onShardFailedNoMaster() {
}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
default void onShardFailedFailure(final Exception e) {
}
}
}

View File

@ -458,7 +458,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId) && shardRouting.started()) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(event.state(), shardRouting, indexMetaData.getIndexUUID(),
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as started, but shard has previous failed. resending shard failure.", null, SHARD_STATE_ACTION_LISTENER);
}
} else {
@ -590,7 +590,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
if (nodes.masterNode() != null) {
shardStateAction.resendShardFailed(state, shardRouting, indexMetaData.getIndexUUID(),
shardStateAction.resendShardFailed(shardRouting, indexMetaData.getIndexUUID(),
"master " + nodes.masterNode() + " marked shard as initializing, but shard is marked as failed, resend shard failure", null, SHARD_STATE_ACTION_LISTENER);
}
return;
@ -788,7 +788,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message);
failedShards.put(shardRouting.shardId(), new FailedShard(shardRouting.version()));
shardStateAction.shardFailed(clusterService.state(), shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
shardStateAction.shardFailed(shardRouting, indexUUID, message, failure, SHARD_STATE_ACTION_LISTENER);
} catch (Throwable e1) {
logger.warn("[{}][{}] failed to mark shard as failed (because of [{}])", e1, shardRouting.getIndex(), shardRouting.getId(), message);
}

View File

@ -20,20 +20,25 @@
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
@ -50,11 +55,57 @@ import static org.hamcrest.CoreMatchers.equalTo;
public class ShardStateActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private ShardStateAction shardStateAction;
private AtomicBoolean timeout;
private TestShardStateAction shardStateAction;
private CapturingTransport transport;
private TransportService transportService;
private TestClusterService clusterService;
private static class TestShardStateAction extends ShardStateAction {
public TestShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) {
super(settings, clusterService, transportService, allocationService, routingService);
}
private Runnable onBeforeTimeout;
public void setOnBeforeTimeout(Runnable onBeforeTimeout) {
this.onBeforeTimeout = onBeforeTimeout;
}
private Runnable onAfterTimeout;
public void setOnAfterTimeout(Runnable onAfterTimeout) {
this.onAfterTimeout = onAfterTimeout;
}
@Override
protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) {
onBeforeTimeout.run();
super.handleTimeout(shardRoutingEntry, observer, options, listener);
onAfterTimeout.run();
}
private Runnable onBeforeWaitForNewMasterAndRetry;
public void setOnBeforeWaitForNewMasterAndRetry(Runnable onBeforeWaitForNewMasterAndRetry) {
this.onBeforeWaitForNewMasterAndRetry = onBeforeWaitForNewMasterAndRetry;
}
private Runnable onAfterWaitForNewMasterAndRetry;
public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterAndRetry) {
this.onAfterWaitForNewMasterAndRetry = onAfterWaitForNewMasterAndRetry;
}
@Override
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
onAfterWaitForNewMasterAndRetry.run();
}
}
@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new ThreadPool("ShardStateActionTest");
@ -68,7 +119,12 @@ public class ShardStateActionTests extends ESTestCase {
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
this.timeout = new AtomicBoolean();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
shardStateAction.setOnBeforeTimeout(() -> {});
shardStateAction.setOnAfterTimeout(() -> {});
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {});
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {});
}
@Override
@ -84,36 +140,72 @@ public class ShardStateActionTests extends ESTestCase {
THREAD_POOL = null;
}
public void testNoMaster() {
public void testNoMaster() throws InterruptedException {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes());
builder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
noMasterBuilder.masterNodeId(null);
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean noMaster = new AtomicBoolean();
assert !noMaster.get();
AtomicBoolean retried = new AtomicBoolean();
AtomicBoolean success = new AtomicBoolean();
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
setUpMasterRetryVerification(noMaster, retried, latch);
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
noMaster.set(true);
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
public void onSuccess() {
success.set(true);
latch.countDown();
}
});
latch.await();
assertTrue(noMaster.get());
assertTrue(retried.get());
assertTrue(success.get());
}
public void testFailure() {
public void testMasterLeft() throws InterruptedException {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean noMaster = new AtomicBoolean();
AtomicBoolean retried = new AtomicBoolean();
AtomicBoolean success = new AtomicBoolean();
setUpMasterRetryVerification(noMaster, retried, latch);
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
});
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(1));
assertFalse(success.get());
transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated"));
latch.await();
assertTrue(success.get());
}
public void testUnhandledFailure() {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
@ -121,59 +213,22 @@ public class ShardStateActionTests extends ESTestCase {
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean failure = new AtomicBoolean();
assert !failure.get();
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onShardFailedNoMaster() {
}
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
public void onShardFailedFailure(Exception e) {
failure.set(true);
}
});
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
assert !failure.get();
assertFalse(failure.get());
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
}
public void testTimeout() throws InterruptedException {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean progress = new AtomicBoolean();
AtomicBoolean timedOut = new AtomicBoolean();
TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS);
CountDownLatch latch = new CountDownLatch(1);
shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() {
@Override
public void onShardFailedFailure(DiscoveryNode master, TransportException e) {
if (e instanceof ReceiveTimeoutTransportException) {
assertFalse(progress.get());
timedOut.set(true);
}
latch.countDown();
}
});
latch.await();
progress.set(true);
assertTrue(timedOut.get());
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();
@ -182,6 +237,33 @@ public class ShardStateActionTests extends ESTestCase {
return shardRouting;
}
private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) {
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {
DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id());
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder));
});
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch));
}
private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) {
invoked.set(true);
// assert a retry request was sent
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
retried.set(capturedRequests.length == 1);
if (retried.get()) {
// finish the request
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
} else {
// there failed to be a retry request
// release the driver thread to fail the test
latch.countDown();
}
}
private Throwable getSimulatedFailure() {
return new CorruptIndexException("simulated", (String) null);
}