diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java
index 2bc1d590106..005f3ff55f2 100644
--- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java
+++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java
@@ -29,7 +29,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.transport.TransportException;
import java.io.IOException;
import java.io.PrintWriter;
@@ -222,14 +221,6 @@ public final class ExceptionsHelper {
return null;
}
- public static boolean isTransportStoppedForAction(final Throwable t, final String action) {
- final TransportException maybeTransport =
- (TransportException) ExceptionsHelper.unwrap(t, TransportException.class);
- return maybeTransport != null
- && (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request")
- || maybeTransport.getMessage().equals("transport stopped, action: " + action));
- }
-
/**
* Throws the specified exception. If null if specified then true
is returned.
*/
diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
index 36f31af27a3..c1a03fbc574 100644
--- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
+++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
@@ -213,9 +213,7 @@ public class ReplicationOperation<
private void onNoLongerPrimary(Exception failure) {
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
- final boolean nodeIsClosing =
- cause instanceof NodeClosedException
- || ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure");
+ final boolean nodeIsClosing = cause instanceof NodeClosedException;
final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,
diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
index 111317132fd..dd10199bbc5 100644
--- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
+++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java
@@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -113,8 +114,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
ActionListener.wrap(
r -> {},
e -> {
- if (ExceptionsHelper.isTransportStoppedForAction(e, ACTION_NAME + "[p]")) {
- // we are likely shutting down
+ if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
+ // node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index 6a88a0110fb..7726f34f21a 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -47,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
@@ -275,8 +276,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
@Override
public void doRun() {
- // cf. ExceptionsHelper#isTransportStoppedForAction
- TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
+ TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
+ holderToNotify.action(), new NodeClosedException(localNode));
holderToNotify.handler().handleException(ex);
}
});
@@ -680,11 +681,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
- *
- * Do not edit this exception message, it is currently relied upon in production code!
*/
- // TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
- throw new TransportException("TransportService is closed stopped can't send request");
+ throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
index 9f86d190a64..3038153a3d5 100644
--- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
@@ -44,7 +44,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.SendRequestTransportException;
-import org.elasticsearch.transport.TransportException;
import java.util.ArrayList;
import java.util.Collections;
@@ -205,12 +204,9 @@ public class ReplicationOperationTests extends ESTestCase {
if (randomBoolean()) {
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
} else if (randomBoolean()) {
+ DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
shardActionFailure = new SendRequestTransportException(
- new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME,
- new TransportException("TransportService is closed stopped can't send request"));
- } else if (randomBoolean()) {
- shardActionFailure = new TransportException(
- "transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME);
+ node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node));
} else {
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
}
diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java
new file mode 100644
index 00000000000..baf2e5e4008
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.support.replication;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.cluster.action.shard.ShardStateAction;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.plugins.ActionPlugin;
+import org.elasticsearch.plugins.NetworkPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportInterceptor;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportResponseHandler;
+import org.elasticsearch.transport.TransportService;
+import org.hamcrest.Matchers;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase {
+
+ @Override
+ protected Collection> nodePlugins() {
+ return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class);
+ }
+
+ @Override
+ protected Collection> transportClientPlugins() {
+ return Arrays.asList(TestPlugin.class);
+ }
+
+ public static class Request extends ReplicationRequest {
+ public Request(ShardId shardId) {
+ super(shardId);
+ }
+
+ public Request(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public String toString() {
+ return "test-request";
+ }
+ }
+
+ public static class Response extends ReplicationResponse {
+ public Response() {
+ }
+
+ public Response(StreamInput in) throws IOException {
+ super(in);
+ }
+ }
+
+ public static class TestAction extends TransportReplicationAction {
+ private static final String ACTION_NAME = "internal:test-replication-action";
+ private static final ActionType TYPE = new ActionType<>(ACTION_NAME, Response::new);
+
+ @Inject
+ public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
+ IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
+ ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+ super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
+ indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
+ }
+
+ @Override
+ protected Response newResponseInstance(StreamInput in) throws IOException {
+ return new Response(in);
+ }
+
+ @Override
+ protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
+ ActionListener> listener) {
+ listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
+ }
+
+ @Override
+ protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
+ return new ReplicaResult();
+ }
+ }
+
+ public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
+ private CountDownLatch actionRunningLatch = new CountDownLatch(1);
+ private CountDownLatch actionWaitLatch = new CountDownLatch(1);
+ private volatile String testActionName;
+
+ public TestPlugin() {
+ }
+
+ @Override
+ public List> getActions() {
+ return Arrays.asList(new ActionHandler<>(TestAction.TYPE, TestAction.class));
+ }
+
+ @Override
+ public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
+ ThreadContext threadContext) {
+ return Arrays.asList(new TransportInterceptor() {
+ @Override
+ public AsyncSender interceptSender(AsyncSender sender) {
+ return new AsyncSender() {
+ @Override
+ public void sendRequest(Transport.Connection connection, String action,
+ TransportRequest request, TransportRequestOptions options,
+ TransportResponseHandler handler) {
+ // only activated on primary
+ if (action.equals(testActionName)) {
+ actionRunningLatch.countDown();
+ try {
+ actionWaitLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ }
+ sender.sendRequest(connection, action, request, options, handler);
+ }
+ };
+ }
+ });
+ }
+ }
+
+ public void testRetryOnStoppedTransportService() throws Exception {
+ internalCluster().startMasterOnlyNodes(2);
+ String primary = internalCluster().startDataOnlyNode();
+ assertAcked(prepareCreate("test")
+ .setSettings(Settings.builder()
+ .put(indexSettings())
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ ));
+
+ String replica = internalCluster().startDataOnlyNode();
+ String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
+ ensureGreen("test");
+
+ TestPlugin primaryTestPlugin = getTestPlugin(primary);
+ // this test only provoked an issue for the primary action, but for completeness, we pick the action randomly
+ primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]");
+ logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica);
+
+ AtomicReference