diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index e0525127ee7..e4269a375dd 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.io.PrintWriter; @@ -193,6 +194,14 @@ public final class ExceptionsHelper { return null; } + public static boolean isTransportStoppedForAction(final Throwable t, final String action) { + final TransportException maybeTransport = + (TransportException) ExceptionsHelper.unwrap(t, TransportException.class); + return maybeTransport != null + && (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request") + || maybeTransport.getMessage().equals("transport stopped, action: " + action)); + } + /** * Throws the specified exception. If null if specified then true is returned. */ diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 7fdb613c38b..22e90cfc135 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -38,7 +38,6 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.util.ArrayList; @@ -205,10 +204,9 @@ public class ReplicationOperation< private void onNoLongerPrimary(Exception failure) { final Throwable cause = ExceptionsHelper.unwrapCause(failure); - final boolean nodeIsClosing = cause instanceof NodeClosedException - || (cause instanceof TransportException && - ("TransportService is closed stopped can't send request".equals(cause.getMessage()) - || "transport stopped, action: internal:cluster/shard/failure".equals(cause.getMessage()))); + final boolean nodeIsClosing = + cause instanceof NodeClosedException + || ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure"); final String message; if (nodeIsClosing) { message = String.format(Locale.ROOT, diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 918ce664aea..570159cc74d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -113,9 +113,15 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi ActionListener.wrap( r -> {}, e -> { - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + if (ExceptionsHelper.isTransportStoppedForAction(e, ACTION_NAME + "[p]")) { + // we are likely shutting down + return; } + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { + // the shard is closed + return; + } + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); })); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 1288f6fe16f..c8493edc979 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -274,6 +274,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } @Override public void doRun() { + // cf. ExceptionsHelper#isTransportStoppedForAction TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action()); holderToNotify.handler().handleException(ex); } @@ -626,8 +627,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } try { if (lifecycle.stoppedOrClosed()) { - // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify - // the caller. It will only notify if the toStop code hasn't done the work yet. + /* + * If we are not started the exception handling will remove the request holder again and calls the handler to notify the + * caller. It will only notify if toStop hasn't done the work yet. + * + * Do not edit this exception message, it is currently relied upon in production code! + */ + // TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction throw new TransportException("TransportService is closed stopped can't send request"); } if (timeoutHandler != null) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6ad7d5039ae..81ea56c6096 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; import org.mockito.ArgumentCaptor; @@ -204,9 +205,13 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { final Exception e = randomFrom( new AlreadyClosedException("closed"), new IndexShardClosedException(indexShard.shardId()), + new TransportException(randomFrom( + "failed", + "TransportService is closed stopped can't send request", + "transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")), new RuntimeException("failed")); listener.onFailure(e); - if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { + if (e.getMessage().equals("failed")) { final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); final ParameterizedMessage message = captor.getValue();