From b69affda6adadc6c191c2e0835343fa7d91d21c6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 6 Mar 2019 14:59:41 -0500 Subject: [PATCH] Use unwrapped cause to determine if node is closing (#39723) We need to unwrap and use the actual cause when determining if the node with primary shard is shutting down because TransportService will throw a TransportException wrapped in a SendRequestTransportException. Relates #39584 --- .../action/support/replication/ReplicationOperation.java | 5 +++-- .../support/replication/ReplicationOperationTests.java | 5 ++++- .../org/elasticsearch/discovery/ClusterDisruptionIT.java | 8 ++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 71483245ee3..c232404d5fe 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -204,8 +204,9 @@ public class ReplicationOperation< } private void onNoLongerPrimary(Exception failure) { - final boolean nodeIsClosing = failure instanceof NodeClosedException || - (failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage())); + final Throwable cause = ExceptionsHelper.unwrapCause(failure); + final boolean nodeIsClosing = cause instanceof NodeClosedException + || (cause instanceof TransportException && "TransportService is closed stopped can't send request".equals(cause.getMessage())); final String message; if (nodeIsClosing) { message = String.format(Locale.ROOT, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index adb79b1fe3b..8adb9c2f26b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.TransportException; import java.util.ArrayList; @@ -203,7 +204,9 @@ public class ReplicationOperationTests extends ESTestCase { if (randomBoolean()) { shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); } else if (randomBoolean()) { - shardActionFailure = new TransportException("TransportService is closed stopped can't send request"); + shardActionFailure = new SendRequestTransportException( + new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), "internal:cluster/shard/failure", + new TransportException("TransportService is closed stopped can't send request")); } else { shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index ac19fd68cde..acec5a583f5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -71,8 +71,10 @@ import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; @@ -480,8 +482,10 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); - IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId()); - assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs)); + IndexShard shard = indicesService.getShardOrNull(shardRouting.shardId()); + Set docs = IndexShardTestCase.getShardDocUIDs(shard); + assertThat("shard [" + shard.routingEntry() + "] docIds [" + docs + "] vs " + " acked docIds [" + ackedDocs + "]", + ackedDocs, everyItem(isIn(docs))); } } }