From 9957bdf0ad88e3625ef42c95a80d5edc3aac02f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 10:05:11 -0400 Subject: [PATCH] Handle primary failure handling replica response Today if the primary throws an exception while handling the replica response (e.g., because it is already closed while updating the local checkpoint for the replica), or because of a bug that causes an exception to be thrown in the replica operation listener, this exception is caught by the underlying transport handler plumbing and is translated into a response handler failure transport exception that is passed to the onFailure method of the replica operation listener. This causes the primary to turn around and fail the replica which is a disastrous and incorrect outcome as there's nothing wrong with the replica, it is the primary that is broken and deserves a paddlin'. This commit handles this situation by failing the primary. Relates #24926 --- .../replication/ReplicationOperation.java | 17 ++++-- .../ReplicationOperationTests.java | 53 ++++++++++++++++++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index ee8270e911c..09b6a4d8220 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -185,7 +186,15 @@ public class ReplicationOperation< @Override public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); - primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); + try { + primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); + } catch (final AlreadyClosedException e) { + // okay, the index was deleted or this shard was never activated after a relocation; fallthrough and finish normally + } catch (final Exception e) { + // fail the primary but fall through and let the rest of operation processing complete + final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); + primary.failShard(message, e); + } decPendingAndFinishIfNeeded(); } @@ -321,7 +330,10 @@ public class ReplicationOperation< ShardRouting routingEntry(); /** - * fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master + * Fail the primary shard. + * + * @param message the failure message + * @param exception the exception that triggered the failure */ void failShard(String message, Exception exception); @@ -335,7 +347,6 @@ public class ReplicationOperation< */ PrimaryResultT perform(RequestT request) throws Exception; - /** * Notifies the primary of a local checkpoint for the given allocation. * diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9fcc8c24353..88cf5769a48 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -56,7 +57,9 @@ import java.util.function.Supplier; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -191,8 +194,7 @@ public class ReplicationOperationTests extends ESTestCase { assertTrue(primaryFailed.compareAndSet(false, true)); } }; - final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, - () -> finalState); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, () -> finalState); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -299,6 +301,53 @@ public class ReplicationOperationTests extends ESTestCase { } } + public void testPrimaryFailureHandlingReplicaResponse() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + final Request request = new Request(shardId); + + final ClusterState state = stateWithActivePrimary(index, true, 1, 0); + final IndexMetaData indexMetaData = state.getMetaData().index(index); + final long primaryTerm = indexMetaData.primaryTerm(0); + final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + + final boolean fatal = randomBoolean(); + final AtomicBoolean primaryFailed = new AtomicBoolean(); + final ReplicationOperation.Primary primary = new TestPrimary(primaryRouting, primaryTerm) { + + @Override + public void failShard(String message, Exception exception) { + primaryFailed.set(true); + } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + if (primaryRouting.allocationId().getId().equals(allocationId)) { + super.updateLocalCheckpointForShard(allocationId, checkpoint); + } else { + if (fatal) { + throw new NullPointerException(); + } else { + throw new AlreadyClosedException("already closed"); + } + } + } + + }; + + final PlainActionFuture listener = new PlainActionFuture<>(); + final ReplicationOperation.Replicas replicas = new TestReplicaProxy(Collections.emptyMap()); + TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, () -> state); + operation.execute(); + + assertThat(primaryFailed.get(), equalTo(fatal)); + final ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(0)); + assertThat(shardInfo.getFailures(), arrayWithSize(0)); + assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size())); + } + private Set getExpectedReplicas(ShardId shardId, ClusterState state) { Set expectedReplicas = new HashSet<>(); String localNodeId = state.nodes().getLocalNodeId();