From 346ff0435ac2252d2c7e1dc4d95aefa4788f78f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 21 Jan 2016 08:08:45 -0500 Subject: [PATCH] Fail demoted primary shards and retry request This commit handles the scenario where a replication action fails on a replica shard, the primary shard attempts to fail the replica shard but the primary shard is notified of demotion by the master. In this scenario, the demoted primary shard must be failed, and then the request rerouted again to the new primary shard. Closes #16415, closes #14252 --- .../TransportReplicationAction.java | 28 ++++- .../ClusterStateCreationUtils.java | 4 +- .../TransportReplicationActionTests.java | 107 +++++++++++++++++- 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b425768b18a..1d22207723b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -415,7 +415,11 @@ public abstract class TransportReplicationAction unassignedNodesExecludingPrimary = new HashSet<>(unassignedNodes); + unassignedNodesExecludingPrimary.remove(newNode(0).id()); + primaryNode = selectAndRemove(unassignedNodesExecludingPrimary); } if (primaryState == ShardRoutingState.RELOCATING) { relocatingNode = selectAndRemove(unassignedNodes); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index d2b0c1a0356..7cf0a4f3b50 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -84,9 +84,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.either; @@ -631,9 +633,11 @@ public class TransportReplicationActionTests extends ESTestCase { indexShardRouting.set(primaryShard); assertIndexShardCounter(2); - // TODO: set a default timeout - TransportReplicationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(task, - request, new Response(), request.shardId(), createTransportChannel(listener), reference); + AtomicReference error = new AtomicReference<>(); + + TransportChannel channel = createTransportChannel(listener, error::set); + TransportReplicationAction.ReplicationPhase replicationPhase = + action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference); assertThat(replicationPhase.totalShards(), equalTo(totalShards)); assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); @@ -704,7 +708,8 @@ public class TransportReplicationActionTests extends ESTestCase { // the shard the request was sent to and the shard to be failed should be the same assertEquals(shardRoutingEntry.getShardRouting(), routing); failures.add(shardFailedRequest); - if (randomBoolean()) { + int ternary = randomIntBetween(0, 2); + if (ternary == 0) { // simulate master left and test that the shard failure is retried int numberOfRetries = randomIntBetween(1, 4); CapturingTransport.CapturedRequest currentRequest = shardFailedRequest; @@ -718,8 +723,19 @@ public class TransportReplicationActionTests extends ESTestCase { } // now simulate that the last retry succeeded transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE); - } else { + } else if (ternary == 1) { + // simulate the primary has been demoted + transport.handleRemoteError(shardFailedRequest.requestId, new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), "shard-failed-test")); + // the primary should fail itself + assertShardIsFailed(); + // we should see a retry on primary exception + assertNotNull(error.get()); + assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class)); + return; + } else if (ternary == 2) { transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + } else { + assert false; } } } else { @@ -882,14 +898,85 @@ public class TransportReplicationActionTests extends ESTestCase { assertPhase(task, "failed"); } + public void testReroutePhaseRetriedAfterDemotedPrimary() { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + boolean localPrimary = true; + clusterService.setState(state(index, localPrimary, + ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { + @Override + protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) { + request.setShardId(shardId); + } + }; + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + + TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); + reroutePhase.run(); + + // reroute phase should send primary action + CapturingTransport.CapturedRequest[] primaryRequests = transport.getCapturedRequestsAndClear(); + assertThat(primaryRequests.length, equalTo(1)); + assertThat(primaryRequests[0].action, equalTo("testAction" + (localPrimary ? "[p]" : ""))); + AtomicReference error = new AtomicReference<>(); + TransportChannel channel = createTransportChannel(listener, error::set); + + // simulate primary action + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(maybeTask(), request, channel); + primaryPhase.run(); + + // primary action should send replica request + CapturingTransport.CapturedRequest[] replicaRequests = transport.getCapturedRequestsAndClear(); + assertThat(replicaRequests.length, equalTo(1)); + assertThat(replicaRequests[0].action, equalTo("testAction[r]")); + indexShardRouting.set(clusterService.state().getRoutingTable().shardRoutingTable(shardId).primaryShard()); + + // simulate replica failure + transport.handleRemoteError(replicaRequests[0].requestId, new Exception("exception")); + + // the primary should request replica failure + CapturingTransport.CapturedRequest[] replicaFailures = transport.getCapturedRequestsAndClear(); + assertThat(replicaFailures.length, equalTo(1)); + assertThat(replicaFailures[0].action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); + + // simulate demoted primary + transport.handleRemoteError(replicaFailures[0].requestId, new ShardStateAction.NoLongerPrimaryShardException(shardId, "demoted")); + assertTrue(isShardFailed.get()); + assertTrue(listener.isDone()); + assertNotNull(error.get()); + assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class)); + assertThat(error.get().getMessage(), containsString("was demoted while failing replica shard")); + + // reroute phase sees the retry + transport.handleRemoteError(primaryRequests[0].requestId, error.get()); + + // publish a new cluster state + boolean localPrimaryOnRetry = randomBoolean(); + clusterService.setState(state(index, localPrimaryOnRetry, + ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear(); + + // the request should be retried + assertThat(primaryRetry.length, equalTo(1)); + assertThat(primaryRetry[0].action, equalTo("testAction" + (localPrimaryOnRetry ? "[p]" : ""))); + } + private void assertIndexShardCounter(int expected) { assertThat(count.get(), equalTo(expected)); } + private void assertShardIsFailed() { + assertTrue(isShardFailed.get()); + } + private final AtomicInteger count = new AtomicInteger(0); private final AtomicBoolean isRelocated = new AtomicBoolean(false); + private final AtomicBoolean isShardFailed = new AtomicBoolean(); + private final AtomicReference indexShardRouting = new AtomicReference<>(); /** @@ -903,6 +990,11 @@ public class TransportReplicationActionTests extends ESTestCase { return isRelocated.get(); } + @Override + public void failShard(String reason, @Nullable Throwable e) { + isShardFailed.set(true); + } + @Override public ShardRouting routingEntry() { ShardRouting shardRouting = indexShardRouting.get(); @@ -1099,6 +1191,10 @@ public class TransportReplicationActionTests extends ESTestCase { * Transport channel that is needed for replica operation testing. */ public TransportChannel createTransportChannel(final PlainActionFuture listener) { + return createTransportChannel(listener, error -> {}); + } + + public TransportChannel createTransportChannel(final PlainActionFuture listener, Consumer consumer) { return new TransportChannel() { @Override @@ -1123,6 +1219,7 @@ public class TransportReplicationActionTests extends ESTestCase { @Override public void sendResponse(Throwable error) throws IOException { + consumer.accept(error); listener.onFailure(error); }