Do not release safe commit with CancellableThreads (#59182)
We are leaking a FileChannel in #39585 if we release a safe commit with CancellableThreads. Although it is a bug in Lucene where we do not close a FileChannel if we failed to create a NIOFSIndexInput, I think it's safer if we release a safe commit using the generic thread pool instead. Closes #39585 Relates #45409
This commit is contained in:
parent
cc3c8be0f1
commit
6a0f7411e2
|
@ -40,6 +40,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.CheckedRunnable;
|
||||||
import org.elasticsearch.common.StopWatch;
|
import org.elasticsearch.common.StopWatch;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -84,6 +85,7 @@ import java.util.Locale;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -230,7 +232,7 @@ public class RecoverySourceHandler {
|
||||||
} else {
|
} else {
|
||||||
final Engine.IndexCommitRef safeCommitRef;
|
final Engine.IndexCommitRef safeCommitRef;
|
||||||
try {
|
try {
|
||||||
safeCommitRef = shard.acquireSafeIndexCommit();
|
safeCommitRef = acquireSafeCommit(shard);
|
||||||
resources.add(safeCommitRef);
|
resources.add(safeCommitRef);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
||||||
|
@ -401,15 +403,32 @@ public class RecoverySourceHandler {
|
||||||
*/
|
*/
|
||||||
private Releasable acquireStore(Store store) {
|
private Releasable acquireStore(Store store) {
|
||||||
store.incRef();
|
store.incRef();
|
||||||
return Releasables.releaseOnce(() -> {
|
return Releasables.releaseOnce(() -> runWithGenericThreadPool(store::decRef));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Releasing a safe commit can access some commit files. It's better not to use {@link CancellableThreads} to interact
|
||||||
|
* with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail).
|
||||||
|
* This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool.
|
||||||
|
*/
|
||||||
|
private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) {
|
||||||
|
final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit();
|
||||||
|
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> {
|
||||||
|
if (closed.compareAndSet(false, true)) {
|
||||||
|
runWithGenericThreadPool(commitRef::close);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {
|
||||||
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||||
assert threadPool.generic().isShutdown() == false;
|
assert threadPool.generic().isShutdown() == false;
|
||||||
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
|
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
|
||||||
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
|
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
|
||||||
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
|
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
|
||||||
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
|
threadPool.generic().execute(ActionRunnable.run(future, task));
|
||||||
FutureUtils.get(future);
|
FutureUtils.get(future);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class SendFileResult {
|
static final class SendFileResult {
|
||||||
|
|
Loading…
Reference in New Issue