Remove timeout mechanism from ShardStateAction

This commit removes the timeout retry mechanism from ShardStateAction
allowing it to instead be handled by the general master channel retry
mechanism. The idea is that if there is a network issue, the master will
miss a ping timeout causing the channel to be closed which will expose
itself via a NodeDisconnectedException. At this point, we can just wait
for a new master and retry, as with any other master channel exception.
This commit is contained in:
Jason Tedor 2016-01-11 12:51:41 -05:00
parent f49435c78b
commit f17f9a5f36
4 changed files with 37 additions and 67 deletions

View File

@ -92,8 +92,6 @@ import java.util.function.Supplier;
*/
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction<Request, Response> {
public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";
protected final TransportService transportService;
protected final ClusterService clusterService;
protected final IndicesService indicesService;
@ -101,7 +99,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
protected final MappingUpdatedAction mappingUpdatedAction;
private final TimeValue shardFailedTimeout;
final String transportReplicaAction;
final String transportPrimaryAction;
@ -133,8 +130,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
this.transportOptions = transportOptions();
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
// TODO: set a default timeout
shardFailedTimeout = settings.getAsTime(SHARD_FAILURE_TIMEOUT, null);
}
@Override
@ -608,7 +603,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference, shardFailedTimeout);
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
} catch (Throwable e) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
if (logger.isTraceEnabled()) {
@ -732,15 +727,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final AtomicInteger pending;
private final int totalShards;
private final Releasable indexShardReference;
private final TimeValue shardFailedTimeout;
public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId,
TransportChannel channel, Releasable indexShardReference, TimeValue shardFailedTimeout) {
TransportChannel channel, Releasable indexShardReference) {
this.replicaRequest = replicaRequest;
this.channel = channel;
this.finalResponse = finalResponse;
this.indexShardReference = indexShardReference;
this.shardFailedTimeout = shardFailedTimeout;
this.shardId = shardId;
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
@ -889,7 +882,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
indexUUID,
message,
exp,
shardFailedTimeout,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {

View File

@ -47,23 +47,26 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
public class ShardStateAction extends AbstractComponent {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
@ -82,7 +85,9 @@ public class ShardStateAction extends AbstractComponent {
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(shardRouting, indexUUID, message, failure, null, listener);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardFailed(observer, shardRoutingEntry, listener);
}
public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
@ -90,24 +95,14 @@ public class ShardStateAction extends AbstractComponent {
shardFailed(shardRouting, indexUUID, message, failure, listener);
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) {
options = TransportRequestOptions.builder().withTimeout(timeout).build();
}
sendShardFailed(observer, shardRoutingEntry, options, listener);
}
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
private void sendShardFailed(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} no master known to fail shard [{}]", shardRoutingEntry.getShardRouting().shardId(), shardRoutingEntry.getShardRouting());
waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
@ -115,11 +110,8 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
if (exp instanceof ReceiveTimeoutTransportException) {
logger.trace("{} timeout sending shard failure [{}] to master [{}]", exp, shardRoutingEntry.getShardRouting().getId(), shardRoutingEntry.failure, masterNode);
handleTimeout(shardRoutingEntry, observer, options, listener);
} else if (exp.getCause() instanceof NotMasterException) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
if (isMasterChannelException(exp.getCause())) {
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(exp);
@ -129,19 +121,17 @@ public class ShardStateAction extends AbstractComponent {
}
}
// visible for testing
protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) {
// set the observed state to the latest cluster state
observer.reset(clusterService.state());
sendShardFailed(observer, shardRoutingEntry, options, listener);
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS = new HashSet<>(Arrays.asList(NotMasterException.class, NodeDisconnectedException.class));
private static boolean isMasterChannelException(Throwable cause) {
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
}
// visible for testing
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
sendShardFailed(observer, shardRoutingEntry, options, listener);
sendShardFailed(observer, shardRoutingEntry, listener);
}
@Override

View File

@ -489,7 +489,7 @@ public class TransportReplicationActionTests extends ESTestCase {
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(request,
new Response(),
request.shardId(), createTransportChannel(listener), reference, null);
request.shardId(), createTransportChannel(listener), reference);
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));

View File

@ -31,13 +31,12 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
@ -45,6 +44,8 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -66,26 +67,6 @@ public class ShardStateActionTests extends ESTestCase {
super(settings, clusterService, transportService, allocationService, routingService);
}
private Runnable onBeforeTimeout;
public void setOnBeforeTimeout(Runnable onBeforeTimeout) {
this.onBeforeTimeout = onBeforeTimeout;
}
private Runnable onAfterTimeout;
public void setOnAfterTimeout(Runnable onAfterTimeout) {
this.onAfterTimeout = onAfterTimeout;
}
@Override
protected void handleTimeout(ShardRoutingEntry shardRoutingEntry, ClusterStateObserver observer, TransportRequestOptions options, Listener listener) {
onBeforeTimeout.run();
super.handleTimeout(shardRoutingEntry, observer, options, listener);
onAfterTimeout.run();
}
private Runnable onBeforeWaitForNewMasterAndRetry;
public void setOnBeforeWaitForNewMasterAndRetry(Runnable onBeforeWaitForNewMasterAndRetry) {
@ -99,9 +80,9 @@ public class ShardStateActionTests extends ESTestCase {
}
@Override
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, TransportRequestOptions options, Listener listener) {
protected void waitForNewMasterAndRetry(ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) {
onBeforeWaitForNewMasterAndRetry.run();
super.waitForNewMasterAndRetry(observer, shardRoutingEntry, options, listener);
super.waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
onAfterWaitForNewMasterAndRetry.run();
}
}
@ -121,8 +102,6 @@ public class ShardStateActionTests extends ESTestCase {
transportService.start();
this.timeout = new AtomicBoolean();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
shardStateAction.setOnBeforeTimeout(() -> {});
shardStateAction.setOnAfterTimeout(() -> {});
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {});
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {});
}
@ -173,7 +152,7 @@ public class ShardStateActionTests extends ESTestCase {
assertTrue(success.get());
}
public void testMasterLeft() throws InterruptedException {
public void testMasterChannelException() throws InterruptedException {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
@ -193,13 +172,22 @@ public class ShardStateActionTests extends ESTestCase {
success.set(true);
latch.countDown();
}
@Override
public void onShardFailedFailure(Exception e) {
success.set(false);
latch.countDown();
}
});
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(1));
assertFalse(success.get());
transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated"));
List<Exception> possibleExceptions = new ArrayList<>();
possibleExceptions.add(new NotMasterException("simulated"));
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
transport.handleResponse(capturedRequests[0].requestId, randomFrom(possibleExceptions));
latch.await();
assertTrue(success.get());