From fe85bdbe6f685d92de8fb51691a7da2e18716251 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 10 Jun 2020 09:41:52 +0200 Subject: [PATCH] Fix Remote Recovery Being Retried for Removed Nodes (#57608) (#57913) If a node is disconnected we retry. It does not make sense to retry the recovery if the node is removed from the cluster though. => added a CS listener that cancels the recovery for removed nodes Also, we were running the retry on the `SAME` pool which for each retry will be the scheduler pool. Since the error path of the listener we use here will do blocking operations when closing the resources used by the recovery we can't use the `SAME` pool here since not all exceptions go to the `ActionListenerResponseHandler` threading like e.g. `NodeNotConnectedException`. Closes #57585 --- .../elasticsearch/indices/IndicesService.java | 4 + .../recovery/PeerRecoverySourceService.java | 75 ++++++++++++++----- .../recovery/RemoteRecoveryTargetHandler.java | 4 + .../PeerRecoverySourceServiceTests.java | 6 +- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 45e00e918c1..53e3db29ee4 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -331,6 +331,10 @@ public class IndicesService extends AbstractLifecycleComponent private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; + public ClusterService clusterService() { + return clusterService; + } + @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 7745c85a107..ad51dc43ebb 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -26,8 +26,12 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -44,17 +48,18 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ -public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener { +public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener, ClusterStateListener { private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class); @@ -88,11 +93,13 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem @Override protected void doStart() { + indicesService.clusterService().addListener(this); } @Override protected void doStop() { ongoingRecoveries.awaitEmpty(); + indicesService.clusterService().removeListener(this); } @Override @@ -107,6 +114,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem } } + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + ongoingRecoveries.cancelOnNodeLeft(removedNode); + } + } + } + private void recover(StartRecoveryRequest request, ActionListener listener) { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); @@ -162,15 +178,28 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private final Map ongoingRecoveries = new HashMap<>(); + private final Map> nodeToHandlers = new HashMap<>(); + @Nullable private List> emptyListeners; synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { assert lifecycle.started(); final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); - RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard); + final Tuple handlers = shardContext.addNewRecovery(request, shard); + final RemoteRecoveryTargetHandler recoveryTargetHandler = handlers.v2(); + nodeToHandlers.computeIfAbsent(recoveryTargetHandler.targetNode(), k -> new HashSet<>()).add(recoveryTargetHandler); shard.recoveryStats().incCurrentAsSource(); - return handler; + return handlers.v1(); + } + + synchronized void cancelOnNodeLeft(DiscoveryNode node) { + final Collection handlers = nodeToHandlers.get(node); + if (handlers != null) { + for (RemoteRecoveryTargetHandler handler : handlers) { + handler.cancel(); + } + } } synchronized void reestablishRecovery(ReestablishRecoveryRequest request, IndexShard shard, @@ -186,10 +215,20 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem synchronized void remove(IndexShard shard, RecoverySourceHandler handler) { final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]"; - boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler); - assert remove : "Handler was not registered [" + handler + "]"; - if (remove) { + final RemoteRecoveryTargetHandler removed = shardRecoveryContext.recoveryHandlers.remove(handler); + assert removed != null : "Handler was not registered [" + handler + "]"; + if (removed != null) { shard.recoveryStats().decCurrentAsSource(); + removed.cancel(); + assert nodeToHandlers.getOrDefault(removed.targetNode(), Collections.emptySet()).contains(removed) + : "Remote recovery was not properly tracked [" + removed + "]"; + nodeToHandlers.computeIfPresent(removed.targetNode(), (k, handlersForNode) -> { + handlersForNode.remove(removed); + if (handlersForNode.isEmpty()) { + return null; + } + return handlersForNode; + }); } if (shardRecoveryContext.recoveryHandlers.isEmpty()) { ongoingRecoveries.remove(shard); @@ -207,7 +246,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); if (shardRecoveryContext != null) { final List failures = new ArrayList<>(); - for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) { + for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers.keySet()) { try { handlers.cancel(reason); } catch (Exception ex) { @@ -237,21 +276,22 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem } private final class ShardRecoveryContext { - final Set recoveryHandlers = new HashSet<>(); + final Map recoveryHandlers = new HashMap<>(); /** * Adds recovery source handler. */ - synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { - for (RecoverySourceHandler existingHandler : recoveryHandlers) { + synchronized Tuple addNewRecovery(StartRecoveryRequest request, + IndexShard shard) { + for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) { if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) { throw new DelayRecoveryException("recovery with same target already registered, waiting for " + "previous recovery attempt to be cancelled or completed"); } } - RecoverySourceHandler handler = createRecoverySourceHandler(request, shard); - recoveryHandlers.add(handler); - return handler; + final Tuple handlers = createRecoverySourceHandler(request, shard); + recoveryHandlers.put(handlers.v1(), handlers.v2()); + return handlers; } /** @@ -259,7 +299,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem */ synchronized void reestablishRecovery(ReestablishRecoveryRequest request, ActionListener listener) { RecoverySourceHandler handler = null; - for (RecoverySourceHandler existingHandler : recoveryHandlers) { + for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) { if (existingHandler.getRequest().recoveryId() == request.recoveryId() && existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) { handler = existingHandler; @@ -273,14 +313,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem handler.addListener(listener); } - private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) { + private Tuple createRecoverySourceHandler(StartRecoveryRequest request, + IndexShard shard) { RecoverySourceHandler handler; final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); - return handler; + return Tuple.tuple(handler, recoveryTarget); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 26f63a75abe..ed5a462d158 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -99,6 +99,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { this.retriesSupported = targetNode.getVersion().onOrAfter(Version.V_7_9_0); } + public DiscoveryNode targetNode() { + return targetNode; + } + @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 491c3974e5b..7f7a1d588d0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -32,13 +33,16 @@ import java.io.IOException; import static org.hamcrest.Matchers.containsString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PeerRecoverySourceServiceTests extends IndexShardTestCase { public void testDuplicateRecoveries() throws IOException { IndexShard primary = newStartedShard(true); + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.clusterService()).thenReturn(mock(ClusterService.class)); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( - mock(TransportService.class), mock(IndicesService.class), + mock(TransportService.class), indicesService, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),