Handle primary failure handling replica response

Today if the primary throws an exception while handling the replica
response (e.g., because it is already closed while updating the local
checkpoint for the replica), or because of a bug that causes an
exception to be thrown in the replica operation listener, this exception
is caught by the underlying transport handler plumbing and is translated
into a response handler failure transport exception that is passed to
the onFailure method of the replica operation listener. This causes the
primary to turn around and fail the replica which is a disastrous and
incorrect outcome as there's nothing wrong with the replica, it is the
primary that is broken and deserves a paddlin'. This commit handles this
situation by failing the primary.

Relates #24926
This commit is contained in:
Jason Tedor 2017-05-30 10:05:11 -04:00 committed by GitHub
parent 491dc1186a
commit 9957bdf0ad
2 changed files with 65 additions and 5 deletions

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
@ -185,7 +186,15 @@ public class ReplicationOperation<
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
try {
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fallthrough and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
decPendingAndFinishIfNeeded();
}
@ -321,7 +330,10 @@ public class ReplicationOperation<
ShardRouting routingEntry();
/**
* fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master
* Fail the primary shard.
*
* @param message the failure message
* @param exception the exception that triggered the failure
*/
void failShard(String message, Exception exception);
@ -335,7 +347,6 @@ public class ReplicationOperation<
*/
PrimaryResultT perform(RequestT request) throws Exception;
/**
* Notifies the primary of a local checkpoint for the given allocation.
*

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
@ -56,7 +57,9 @@ import java.util.function.Supplier;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -191,8 +194,7 @@ public class ReplicationOperationTests extends ESTestCase {
assertTrue(primaryFailed.compareAndSet(false, true));
}
};
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy,
() -> finalState);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, () -> finalState);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@ -299,6 +301,53 @@ public class ReplicationOperationTests extends ESTestCase {
}
}
public void testPrimaryFailureHandlingReplicaResponse() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final Request request = new Request(shardId);
final ClusterState state = stateWithActivePrimary(index, true, 1, 0);
final IndexMetaData indexMetaData = state.getMetaData().index(index);
final long primaryTerm = indexMetaData.primaryTerm(0);
final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
final boolean fatal = randomBoolean();
final AtomicBoolean primaryFailed = new AtomicBoolean();
final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary = new TestPrimary(primaryRouting, primaryTerm) {
@Override
public void failShard(String message, Exception exception) {
primaryFailed.set(true);
}
@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
if (primaryRouting.allocationId().getId().equals(allocationId)) {
super.updateLocalCheckpointForShard(allocationId, checkpoint);
} else {
if (fatal) {
throw new NullPointerException();
} else {
throw new AlreadyClosedException("already closed");
}
}
}
};
final PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final ReplicationOperation.Replicas<Request> replicas = new TestReplicaProxy(Collections.emptyMap());
TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, () -> state);
operation.execute();
assertThat(primaryFailed.get(), equalTo(fatal));
final ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size()));
}
private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state) {
Set<ShardRouting> expectedReplicas = new HashSet<>();
String localNodeId = state.nodes().getLocalNodeId();