diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 45e00e918c1..53e3db29ee4 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -331,6 +331,10 @@ public class IndicesService extends AbstractLifecycleComponent private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; + public ClusterService clusterService() { + return clusterService; + } + @Override protected void doStop() { ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 7745c85a107..ad51dc43ebb 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -26,8 +26,12 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -44,17 +48,18 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ -public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener { +public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener, ClusterStateListener { private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class); @@ -88,11 +93,13 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem @Override protected void doStart() { + indicesService.clusterService().addListener(this); } @Override protected void doStop() { ongoingRecoveries.awaitEmpty(); + indicesService.clusterService().removeListener(this); } @Override @@ -107,6 +114,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem } } + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + ongoingRecoveries.cancelOnNodeLeft(removedNode); + } + } + } + private void recover(StartRecoveryRequest request, ActionListener listener) { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); @@ -162,15 +178,28 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem private final Map ongoingRecoveries = new HashMap<>(); + private final Map> nodeToHandlers = new HashMap<>(); + @Nullable private List> emptyListeners; synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { assert lifecycle.started(); final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); - RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard); + final Tuple handlers = shardContext.addNewRecovery(request, shard); + final RemoteRecoveryTargetHandler recoveryTargetHandler = handlers.v2(); + nodeToHandlers.computeIfAbsent(recoveryTargetHandler.targetNode(), k -> new HashSet<>()).add(recoveryTargetHandler); shard.recoveryStats().incCurrentAsSource(); - return handler; + return handlers.v1(); + } + + synchronized void cancelOnNodeLeft(DiscoveryNode node) { + final Collection handlers = nodeToHandlers.get(node); + if (handlers != null) { + for (RemoteRecoveryTargetHandler handler : handlers) { + handler.cancel(); + } + } } synchronized void reestablishRecovery(ReestablishRecoveryRequest request, IndexShard shard, @@ -186,10 +215,20 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem synchronized void remove(IndexShard shard, RecoverySourceHandler handler) { final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]"; - boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler); - assert remove : "Handler was not registered [" + handler + "]"; - if (remove) { + final RemoteRecoveryTargetHandler removed = shardRecoveryContext.recoveryHandlers.remove(handler); + assert removed != null : "Handler was not registered [" + handler + "]"; + if (removed != null) { shard.recoveryStats().decCurrentAsSource(); + removed.cancel(); + assert nodeToHandlers.getOrDefault(removed.targetNode(), Collections.emptySet()).contains(removed) + : "Remote recovery was not properly tracked [" + removed + "]"; + nodeToHandlers.computeIfPresent(removed.targetNode(), (k, handlersForNode) -> { + handlersForNode.remove(removed); + if (handlersForNode.isEmpty()) { + return null; + } + return handlersForNode; + }); } if (shardRecoveryContext.recoveryHandlers.isEmpty()) { ongoingRecoveries.remove(shard); @@ -207,7 +246,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); if (shardRecoveryContext != null) { final List failures = new ArrayList<>(); - for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) { + for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers.keySet()) { try { handlers.cancel(reason); } catch (Exception ex) { @@ -237,21 +276,22 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem } private final class ShardRecoveryContext { - final Set recoveryHandlers = new HashSet<>(); + final Map recoveryHandlers = new HashMap<>(); /** * Adds recovery source handler. */ - synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { - for (RecoverySourceHandler existingHandler : recoveryHandlers) { + synchronized Tuple addNewRecovery(StartRecoveryRequest request, + IndexShard shard) { + for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) { if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) { throw new DelayRecoveryException("recovery with same target already registered, waiting for " + "previous recovery attempt to be cancelled or completed"); } } - RecoverySourceHandler handler = createRecoverySourceHandler(request, shard); - recoveryHandlers.add(handler); - return handler; + final Tuple handlers = createRecoverySourceHandler(request, shard); + recoveryHandlers.put(handlers.v1(), handlers.v2()); + return handlers; } /** @@ -259,7 +299,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem */ synchronized void reestablishRecovery(ReestablishRecoveryRequest request, ActionListener listener) { RecoverySourceHandler handler = null; - for (RecoverySourceHandler existingHandler : recoveryHandlers) { + for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) { if (existingHandler.getRequest().recoveryId() == request.recoveryId() && existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) { handler = existingHandler; @@ -273,14 +313,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem handler.addListener(listener); } - private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) { + private Tuple createRecoverySourceHandler(StartRecoveryRequest request, + IndexShard shard) { RecoverySourceHandler handler; final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); - return handler; + return Tuple.tuple(handler, recoveryTarget); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 26f63a75abe..ed5a462d158 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -99,6 +99,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { this.retriesSupported = targetNode.getVersion().onOrAfter(Version.V_7_9_0); } + public DiscoveryNode targetNode() { + return targetNode; + } + @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 491c3974e5b..7f7a1d588d0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -32,13 +33,16 @@ import java.io.IOException; import static org.hamcrest.Matchers.containsString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PeerRecoverySourceServiceTests extends IndexShardTestCase { public void testDuplicateRecoveries() throws IOException { IndexShard primary = newStartedShard(true); + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.clusterService()).thenReturn(mock(ClusterService.class)); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( - mock(TransportService.class), mock(IndicesService.class), + mock(TransportService.class), indicesService, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),