Fix Transport Stopped Exception (#48930) (#49035)

When a node shuts down, `TransportService` moves to stopped state and
then closes connections. If a request is done in between, an exception
was thrown that was not retried in replication actions. Now throw a
wrapped `NodeClosedException` exception instead, which is correctly
handled in replication action. Fixed other usages too.

Relates #42612
This commit is contained in:
Henning Andersen 2019-11-13 18:48:05 +01:00 committed by GitHub
parent fb685adc94
commit 66f0c8900f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 260 additions and 32 deletions

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportException;
import java.io.IOException;
import java.io.PrintWriter;
@ -222,14 +221,6 @@ public final class ExceptionsHelper {
return null;
}
public static boolean isTransportStoppedForAction(final Throwable t, final String action) {
final TransportException maybeTransport =
(TransportException) ExceptionsHelper.unwrap(t, TransportException.class);
return maybeTransport != null
&& (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request")
|| maybeTransport.getMessage().equals("transport stopped, action: " + action));
}
/**
* Throws the specified exception. If null if specified then <code>true</code> is returned.
*/

View File

@ -213,9 +213,7 @@ public class ReplicationOperation<
private void onNoLongerPrimary(Exception failure) {
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
final boolean nodeIsClosing =
cause instanceof NodeClosedException
|| ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure");
final boolean nodeIsClosing = cause instanceof NodeClosedException;
final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -113,8 +114,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.isTransportStoppedForAction(e, ACTION_NAME + "[p]")) {
// we are likely shutting down
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
@ -275,8 +276,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
@Override
public void doRun() {
// cf. ExceptionsHelper#isTransportStoppedForAction
TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
holderToNotify.action(), new NodeClosedException(localNode));
holderToNotify.handler().handleException(ex);
}
});
@ -680,11 +681,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*
* Do not edit this exception message, it is currently relied upon in production code!
*/
// TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
throw new TransportException("TransportService is closed stopped can't send request");
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;

View File

@ -44,7 +44,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import java.util.ArrayList;
import java.util.Collections;
@ -205,12 +204,9 @@ public class ReplicationOperationTests extends ESTestCase {
if (randomBoolean()) {
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
} else if (randomBoolean()) {
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
shardActionFailure = new SendRequestTransportException(
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME,
new TransportException("TransportService is closed stopped can't send request"));
} else if (randomBoolean()) {
shardActionFailure = new TransportException(
"transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME);
node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node));
} else {
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(TestPlugin.class);
}
public static class Request extends ReplicationRequest<Request> {
public Request(ShardId shardId) {
super(shardId);
}
public Request(StreamInput in) throws IOException {
super(in);
}
@Override
public String toString() {
return "test-request";
}
}
public static class Response extends ReplicationResponse {
public Response() {
}
public Response(StreamInput in) throws IOException {
super(in);
}
}
public static class TestAction extends TransportReplicationAction<Request, Request, Response> {
private static final String ACTION_NAME = "internal:test-replication-action";
private static final ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);
@Inject
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
}
@Override
protected Response newResponseInstance(StreamInput in) throws IOException {
return new Response(in);
}
@Override
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
}
@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
return new ReplicaResult();
}
}
public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
private CountDownLatch actionRunningLatch = new CountDownLatch(1);
private CountDownLatch actionWaitLatch = new CountDownLatch(1);
private volatile String testActionName;
public TestPlugin() {
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(TestAction.TYPE, TestAction.class));
}
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return Arrays.asList(new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
TransportRequest request, TransportRequestOptions options,
TransportResponseHandler<T> handler) {
// only activated on primary
if (action.equals(testActionName)) {
actionRunningLatch.countDown();
try {
actionWaitLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
sender.sendRequest(connection, action, request, options, handler);
}
};
}
});
}
}
public void testRetryOnStoppedTransportService() throws Exception {
internalCluster().startMasterOnlyNodes(2);
String primary = internalCluster().startDataOnlyNode();
assertAcked(prepareCreate("test")
.setSettings(Settings.builder()
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
));
String replica = internalCluster().startDataOnlyNode();
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
ensureGreen("test");
TestPlugin primaryTestPlugin = getTestPlugin(primary);
// this test only provoked an issue for the primary action, but for completeness, we pick the action randomly
primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]");
logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica);
AtomicReference<Object> response = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)),
ActionListener.runAfter(ActionListener.wrap(
r -> assertTrue(response.compareAndSet(null, r)),
e -> assertTrue(response.compareAndSet(null, e))),
doneLatch::countDown));
assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS));
MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
primary);
// we pause node after TransportService has moved to stopped, but before closing connections, since if connections are closed
// we would not hit the transport service closed case.
primaryTransportService.addOnStopListener(() -> {
primaryTestPlugin.actionWaitLatch.countDown();
try {
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
if (response.get() instanceof Exception) {
throw new AssertionError(response.get());
}
}
private TestPlugin getTestPlugin(String node) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, node);
List<TestPlugin> testPlugins = pluginsService.filterPlugins(TestPlugin.class);
assertThat(testPlugins, Matchers.hasSize(1));
return testPlugins.get(0);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -38,11 +39,13 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.mockito.ArgumentCaptor;
@ -212,10 +215,11 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
final Exception e = randomFrom(
new AlreadyClosedException("closed"),
new IndexShardClosedException(indexShard.shardId()),
new TransportException(randomFrom(
"failed",
"TransportService is closed stopped can't send request",
"transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")),
new TransportException("failed"),
new SendRequestTransportException(null, randomFrom(
"some-action",
"indices:admin/seq_no/retention_lease_background_sync[p]"
), new NodeClosedException((DiscoveryNode) null)),
new RuntimeException("failed"));
listener.onFailure(e);
if (e.getMessage().equals("failed")) {

View File

@ -92,6 +92,8 @@ public final class MockTransportService extends TransportService {
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
private final List<Runnable> onStopListeners = new CopyOnWriteArrayList<>();
public static class TestPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
@ -528,6 +530,16 @@ public final class MockTransportService extends TransportService {
return connection;
}
public void addOnStopListener(Runnable listener) {
onStopListeners.add(listener);
}
@Override
protected void doStop() {
onStopListeners.forEach(Runnable::run);
super.doStop();
}
@Override
protected void doClose() throws IOException {
super.doClose();

View File

@ -546,8 +546,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
actual instanceof IndexClosedException || // If follow index is closed
actual instanceof ConnectTransportException ||
actual instanceof NodeClosedException ||
actual instanceof NoSuchRemoteClusterException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed"));
actual instanceof NoSuchRemoteClusterException;
}
// These methods are protected for testing purposes: