Use unwrapped cause to determine if node is closing (#39723)
We need to unwrap and use the actual cause when determining if the node with primary shard is shutting down because TransportService will throw a TransportException wrapped in a SendRequestTransportException. Relates #39584
This commit is contained in:
parent
1fe7cb594f
commit
b69affda6a
|
@ -204,8 +204,9 @@ public class ReplicationOperation<
|
|||
}
|
||||
|
||||
private void onNoLongerPrimary(Exception failure) {
|
||||
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
|
||||
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
|
||||
final boolean nodeIsClosing = cause instanceof NodeClosedException
|
||||
|| (cause instanceof TransportException && "TransportService is closed stopped can't send request".equals(cause.getMessage()));
|
||||
final String message;
|
||||
if (nodeIsClosing) {
|
||||
message = String.format(Locale.ROOT,
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ReplicationGroup;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.SendRequestTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -203,7 +204,9 @@ public class ReplicationOperationTests extends ESTestCase {
|
|||
if (randomBoolean()) {
|
||||
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
|
||||
} else if (randomBoolean()) {
|
||||
shardActionFailure = new TransportException("TransportService is closed stopped can't send request");
|
||||
shardActionFailure = new SendRequestTransportException(
|
||||
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), "internal:cluster/shard/failure",
|
||||
new TransportException("TransportService is closed stopped can't send request"));
|
||||
} else {
|
||||
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
|
||||
}
|
||||
|
|
|
@ -71,8 +71,10 @@ import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
|||
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
import static org.hamcrest.Matchers.isOneOf;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
|
@ -480,8 +482,10 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
|||
for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) {
|
||||
String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName();
|
||||
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
|
||||
IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId());
|
||||
assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs));
|
||||
IndexShard shard = indicesService.getShardOrNull(shardRouting.shardId());
|
||||
Set<String> docs = IndexShardTestCase.getShardDocUIDs(shard);
|
||||
assertThat("shard [" + shard.routingEntry() + "] docIds [" + docs + "] vs " + " acked docIds [" + ackedDocs + "]",
|
||||
ackedDocs, everyItem(isIn(docs)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue