If a node is disconnected we retry. It does not make sense to retry the recovery if the node is removed from the cluster though. => added a CS listener that cancels the recovery for removed nodes Also, we were running the retry on the `SAME` pool which for each retry will be the scheduler pool. Since the error path of the listener we use here will do blocking operations when closing the resources used by the recovery we can't use the `SAME` pool here since not all exceptions go to the `ActionListenerResponseHandler` threading like e.g. `NodeNotConnectedException`. Closes #57585
This commit is contained in:
parent
d579420452
commit
fe85bdbe6f
|
@ -331,6 +331,10 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
|
||||
private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
|
||||
|
||||
public ClusterService clusterService() {
|
||||
return clusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);
|
||||
|
|
|
@ -26,8 +26,12 @@ import org.elasticsearch.ResourceNotFoundException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ChannelActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -44,17 +48,18 @@ import org.elasticsearch.transport.TransportRequestHandler;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
|
||||
* source shard to the target shard.
|
||||
*/
|
||||
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {
|
||||
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener, ClusterStateListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);
|
||||
|
||||
|
@ -88,11 +93,13 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
indicesService.clusterService().addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
ongoingRecoveries.awaitEmpty();
|
||||
indicesService.clusterService().removeListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -107,6 +114,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.nodesRemoved()) {
|
||||
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
|
||||
ongoingRecoveries.cancelOnNodeLeft(removedNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
|
||||
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
final IndexShard shard = indexService.getShard(request.shardId().id());
|
||||
|
@ -162,15 +178,28 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
|
||||
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
|
||||
|
||||
private final Map<DiscoveryNode, Collection<RemoteRecoveryTargetHandler>> nodeToHandlers = new HashMap<>();
|
||||
|
||||
@Nullable
|
||||
private List<ActionListener<Void>> emptyListeners;
|
||||
|
||||
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
|
||||
assert lifecycle.started();
|
||||
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
|
||||
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
|
||||
final Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> handlers = shardContext.addNewRecovery(request, shard);
|
||||
final RemoteRecoveryTargetHandler recoveryTargetHandler = handlers.v2();
|
||||
nodeToHandlers.computeIfAbsent(recoveryTargetHandler.targetNode(), k -> new HashSet<>()).add(recoveryTargetHandler);
|
||||
shard.recoveryStats().incCurrentAsSource();
|
||||
return handler;
|
||||
return handlers.v1();
|
||||
}
|
||||
|
||||
synchronized void cancelOnNodeLeft(DiscoveryNode node) {
|
||||
final Collection<RemoteRecoveryTargetHandler> handlers = nodeToHandlers.get(node);
|
||||
if (handlers != null) {
|
||||
for (RemoteRecoveryTargetHandler handler : handlers) {
|
||||
handler.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void reestablishRecovery(ReestablishRecoveryRequest request, IndexShard shard,
|
||||
|
@ -186,10 +215,20 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
|
||||
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
|
||||
assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]";
|
||||
boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler);
|
||||
assert remove : "Handler was not registered [" + handler + "]";
|
||||
if (remove) {
|
||||
final RemoteRecoveryTargetHandler removed = shardRecoveryContext.recoveryHandlers.remove(handler);
|
||||
assert removed != null : "Handler was not registered [" + handler + "]";
|
||||
if (removed != null) {
|
||||
shard.recoveryStats().decCurrentAsSource();
|
||||
removed.cancel();
|
||||
assert nodeToHandlers.getOrDefault(removed.targetNode(), Collections.emptySet()).contains(removed)
|
||||
: "Remote recovery was not properly tracked [" + removed + "]";
|
||||
nodeToHandlers.computeIfPresent(removed.targetNode(), (k, handlersForNode) -> {
|
||||
handlersForNode.remove(removed);
|
||||
if (handlersForNode.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return handlersForNode;
|
||||
});
|
||||
}
|
||||
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
|
||||
ongoingRecoveries.remove(shard);
|
||||
|
@ -207,7 +246,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
|
||||
if (shardRecoveryContext != null) {
|
||||
final List<Exception> failures = new ArrayList<>();
|
||||
for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) {
|
||||
for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers.keySet()) {
|
||||
try {
|
||||
handlers.cancel(reason);
|
||||
} catch (Exception ex) {
|
||||
|
@ -237,21 +276,22 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
}
|
||||
|
||||
private final class ShardRecoveryContext {
|
||||
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();
|
||||
final Map<RecoverySourceHandler, RemoteRecoveryTargetHandler> recoveryHandlers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Adds recovery source handler.
|
||||
*/
|
||||
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
|
||||
for (RecoverySourceHandler existingHandler : recoveryHandlers) {
|
||||
synchronized Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> addNewRecovery(StartRecoveryRequest request,
|
||||
IndexShard shard) {
|
||||
for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) {
|
||||
if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
|
||||
throw new DelayRecoveryException("recovery with same target already registered, waiting for " +
|
||||
"previous recovery attempt to be cancelled or completed");
|
||||
}
|
||||
}
|
||||
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
|
||||
recoveryHandlers.add(handler);
|
||||
return handler;
|
||||
final Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> handlers = createRecoverySourceHandler(request, shard);
|
||||
recoveryHandlers.put(handlers.v1(), handlers.v2());
|
||||
return handlers;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -259,7 +299,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
*/
|
||||
synchronized void reestablishRecovery(ReestablishRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
|
||||
RecoverySourceHandler handler = null;
|
||||
for (RecoverySourceHandler existingHandler : recoveryHandlers) {
|
||||
for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) {
|
||||
if (existingHandler.getRequest().recoveryId() == request.recoveryId() &&
|
||||
existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
|
||||
handler = existingHandler;
|
||||
|
@ -273,14 +313,15 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
handler.addListener(listener);
|
||||
}
|
||||
|
||||
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
|
||||
private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecoverySourceHandler(StartRecoveryRequest request,
|
||||
IndexShard shard) {
|
||||
RecoverySourceHandler handler;
|
||||
final RemoteRecoveryTargetHandler recoveryTarget =
|
||||
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
|
||||
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
|
||||
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
|
||||
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
|
||||
return handler;
|
||||
return Tuple.tuple(handler, recoveryTarget);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,6 +99,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
this.retriesSupported = targetNode.getVersion().onOrAfter(Version.V_7_9_0);
|
||||
}
|
||||
|
||||
public DiscoveryNode targetNode() {
|
||||
return targetNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
|
||||
final String action = PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
@ -32,13 +33,16 @@ import java.io.IOException;
|
|||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
|
||||
|
||||
public void testDuplicateRecoveries() throws IOException {
|
||||
IndexShard primary = newStartedShard(true);
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
when(indicesService.clusterService()).thenReturn(mock(ClusterService.class));
|
||||
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
|
||||
mock(TransportService.class), mock(IndicesService.class),
|
||||
mock(TransportService.class), indicesService,
|
||||
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
|
||||
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
|
||||
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
|
||||
|
|
Loading…
Reference in New Issue