Wait for all Rec. to Stop on Node Close (#46178) (#46237)

* Wait for all Rec. to Stop on Node Close

* This issue is in the `RecoverySourceHandler#acquireStore`. If we submit the store release to the generic threadpool while it is getting shut down we never complete the futue we wait on (in the generic pool as well) and fail to ever release the store potentially.
* Fixed by waiting for all recoveries to end on node close so that we aways have a healthy thread pool here
* Closes #45956
This commit is contained in:
Armin Braun 2019-09-02 18:04:37 +02:00 committed by GitHub
parent bd7a04cd55
commit 2662c1b417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 8 deletions

View File

@ -24,10 +24,13 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -50,7 +53,7 @@ import java.util.Set;
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
*/
public class PeerRecoverySourceService implements IndexEventListener {
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);
@ -74,6 +77,19 @@ public class PeerRecoverySourceService implements IndexEventListener {
new StartRecoveryTransportRequestHandler());
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
ongoingRecoveries.awaitEmpty();
}
@Override
protected void doClose() {
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
Settings indexSettings) {
@ -118,9 +134,14 @@ public class PeerRecoverySourceService implements IndexEventListener {
}
final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
@Nullable
private List<ActionListener<Void>> emptyListeners;
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
assert lifecycle.started();
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
@ -138,6 +159,13 @@ public class PeerRecoverySourceService implements IndexEventListener {
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
}
if (ongoingRecoveries.isEmpty()) {
if (emptyListeners != null) {
final List<ActionListener<Void>> onEmptyListeners = emptyListeners;
emptyListeners = null;
ActionListener.onResponse(onEmptyListeners, null);
}
}
}
synchronized void cancel(IndexShard shard, String reason) {
@ -157,6 +185,22 @@ public class PeerRecoverySourceService implements IndexEventListener {
}
}
void awaitEmpty() {
assert lifecycle.stoppedOrClosed();
final PlainActionFuture<Void> future;
synchronized (this) {
if (ongoingRecoveries.isEmpty()) {
return;
}
future = new PlainActionFuture<>();
if (emptyListeners == null) {
emptyListeners = new ArrayList<>();
}
emptyListeners.add(future);
}
FutureUtils.get(future);
}
private final class ShardRecoveryContext {
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();

View File

@ -405,13 +405,14 @@ public class RecoverySourceHandler {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
store.decRef();
l.onResponse(null);
}));
FutureUtils.get(future);
});
}

View File

@ -602,6 +602,7 @@ public class Node implements Closeable {
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
@ -697,6 +698,7 @@ public class Node implements Closeable {
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
final MetaData onDiskMetadata;
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
@ -842,6 +844,7 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));

View File

@ -43,6 +43,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO);
peerRecoverySourceService.start();
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));