Fail demoted primary shards and retry request

This commit handles the scenario where a replication action fails on a
replica shard, the primary shard attempts to fail the replica shard
but the primary shard is notified of demotion by the master. In this
scenario, the demoted primary shard must be failed, and then the
request rerouted again to the new primary shard.

Closes #16415, closes #14252
This commit is contained in:
Jason Tedor 2016-01-21 08:08:45 -05:00
parent 321c463929
commit 346ff0435a
3 changed files with 128 additions and 11 deletions

View File

@ -415,7 +415,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
super(msg);
this(shardId, msg, null);
}
public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
setShard(shardId);
}
@ -801,6 +805,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
* relocating copies
*/
final class ReplicationPhase extends AbstractRunnable {
private final ReplicationTask task;
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
@ -982,9 +987,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@Override
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
ShardRouting primaryShard = indexShardReference.routingEntry();
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert false : shardFailedError;
onReplicaFailure(nodeId, exp);
}
}
}
);
@ -1070,7 +1083,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
interface IndexShardReference extends Releasable {
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();
}
@ -1098,6 +1111,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return indexShard.state() == IndexShardState.RELOCATED;
}
@Override
public void failShard(String reason, @Nullable Throwable e) {
indexShard.failShard(reason, e);
}
@Override
public ShardRouting routingEntry() {
return indexShard.routingEntry();

View File

@ -101,7 +101,9 @@ public class ClusterStateCreationUtils {
primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode);
} else {
primaryNode = selectAndRemove(unassignedNodes);
Set<String> unassignedNodesExecludingPrimary = new HashSet<>(unassignedNodes);
unassignedNodesExecludingPrimary.remove(newNode(0).id());
primaryNode = selectAndRemove(unassignedNodesExecludingPrimary);
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);

View File

@ -84,9 +84,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.either;
@ -631,9 +633,11 @@ public class TransportReplicationActionTests extends ESTestCase {
indexShardRouting.set(primaryShard);
assertIndexShardCounter(2);
// TODO: set a default timeout
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase = action.new ReplicationPhase(task,
request, new Response(), request.shardId(), createTransportChannel(listener), reference);
AtomicReference<Throwable> error = new AtomicReference<>();
TransportChannel channel = createTransportChannel(listener, error::set);
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
@ -704,7 +708,8 @@ public class TransportReplicationActionTests extends ESTestCase {
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardRoutingEntry.getShardRouting(), routing);
failures.add(shardFailedRequest);
if (randomBoolean()) {
int ternary = randomIntBetween(0, 2);
if (ternary == 0) {
// simulate master left and test that the shard failure is retried
int numberOfRetries = randomIntBetween(1, 4);
CapturingTransport.CapturedRequest currentRequest = shardFailedRequest;
@ -718,8 +723,19 @@ public class TransportReplicationActionTests extends ESTestCase {
}
// now simulate that the last retry succeeded
transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE);
} else {
} else if (ternary == 1) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId, new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), "shard-failed-test"));
// the primary should fail itself
assertShardIsFailed();
// we should see a retry on primary exception
assertNotNull(error.get());
assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class));
return;
} else if (ternary == 2) {
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
} else {
assert false;
}
}
} else {
@ -882,14 +898,85 @@ public class TransportReplicationActionTests extends ESTestCase {
assertPhase(task, "failed");
}
public void testReroutePhaseRetriedAfterDemotedPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
boolean localPrimary = true;
clusterService.setState(state(index, localPrimary,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
request.setShardId(shardId);
}
};
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
reroutePhase.run();
// reroute phase should send primary action
CapturingTransport.CapturedRequest[] primaryRequests = transport.getCapturedRequestsAndClear();
assertThat(primaryRequests.length, equalTo(1));
assertThat(primaryRequests[0].action, equalTo("testAction" + (localPrimary ? "[p]" : "")));
AtomicReference<Throwable> error = new AtomicReference<>();
TransportChannel channel = createTransportChannel(listener, error::set);
// simulate primary action
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(maybeTask(), request, channel);
primaryPhase.run();
// primary action should send replica request
CapturingTransport.CapturedRequest[] replicaRequests = transport.getCapturedRequestsAndClear();
assertThat(replicaRequests.length, equalTo(1));
assertThat(replicaRequests[0].action, equalTo("testAction[r]"));
indexShardRouting.set(clusterService.state().getRoutingTable().shardRoutingTable(shardId).primaryShard());
// simulate replica failure
transport.handleRemoteError(replicaRequests[0].requestId, new Exception("exception"));
// the primary should request replica failure
CapturingTransport.CapturedRequest[] replicaFailures = transport.getCapturedRequestsAndClear();
assertThat(replicaFailures.length, equalTo(1));
assertThat(replicaFailures[0].action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));
// simulate demoted primary
transport.handleRemoteError(replicaFailures[0].requestId, new ShardStateAction.NoLongerPrimaryShardException(shardId, "demoted"));
assertTrue(isShardFailed.get());
assertTrue(listener.isDone());
assertNotNull(error.get());
assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class));
assertThat(error.get().getMessage(), containsString("was demoted while failing replica shard"));
// reroute phase sees the retry
transport.handleRemoteError(primaryRequests[0].requestId, error.get());
// publish a new cluster state
boolean localPrimaryOnRetry = randomBoolean();
clusterService.setState(state(index, localPrimaryOnRetry,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear();
// the request should be retried
assertThat(primaryRetry.length, equalTo(1));
assertThat(primaryRetry[0].action, equalTo("testAction" + (localPrimaryOnRetry ? "[p]" : "")));
}
private void assertIndexShardCounter(int expected) {
assertThat(count.get(), equalTo(expected));
}
private void assertShardIsFailed() {
assertTrue(isShardFailed.get());
}
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
private final AtomicBoolean isShardFailed = new AtomicBoolean();
private final AtomicReference<ShardRouting> indexShardRouting = new AtomicReference<>();
/**
@ -903,6 +990,11 @@ public class TransportReplicationActionTests extends ESTestCase {
return isRelocated.get();
}
@Override
public void failShard(String reason, @Nullable Throwable e) {
isShardFailed.set(true);
}
@Override
public ShardRouting routingEntry() {
ShardRouting shardRouting = indexShardRouting.get();
@ -1099,6 +1191,10 @@ public class TransportReplicationActionTests extends ESTestCase {
* Transport channel that is needed for replica operation testing.
*/
public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
return createTransportChannel(listener, error -> {});
}
public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener, Consumer<Throwable> consumer) {
return new TransportChannel() {
@Override
@ -1123,6 +1219,7 @@ public class TransportReplicationActionTests extends ESTestCase {
@Override
public void sendResponse(Throwable error) throws IOException {
consumer.accept(error);
listener.onFailure(error);
}