diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 71fe528c6f2..daf1aa6bd0b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -99,6 +100,7 @@ public abstract class Engine implements Closeable { protected final EngineConfig engineConfig; protected final Store store; protected final AtomicBoolean isClosed = new AtomicBoolean(false); + private final CountDownLatch closedLatch = new CountDownLatch(1); protected final EventListener eventListener; protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @@ -852,7 +854,7 @@ public abstract class Engine implements Closeable { failedEngine.set((failure != null) ? failure : new IllegalStateException(reason)); try { // we just go and close this engine - no way to recover - closeNoLock("engine failed on: [" + reason + "]"); + closeNoLock("engine failed on: [" + reason + "]", closedLatch); } finally { logger.warn((Supplier) () -> new ParameterizedMessage("failed engine [{}]", reason), failure); // we must set a failure exception, generate one if not supplied @@ -1300,8 +1302,10 @@ public abstract class Engine implements Closeable { /** * Method to close the engine while the write lock is held. + * Must decrement the supplied when closing work is done and resources are + * freed. */ - protected abstract void closeNoLock(String reason); + protected abstract void closeNoLock(String reason, CountDownLatch closedLatch); /** * Flush the engine (committing segments to disk and truncating the @@ -1324,6 +1328,7 @@ public abstract class Engine implements Closeable { } } } + awaitPendingClose(); } @Override @@ -1332,9 +1337,18 @@ public abstract class Engine implements Closeable { logger.debug("close now acquiring writeLock"); try (ReleasableLock lock = writeLock.acquire()) { logger.debug("close acquired writeLock"); - closeNoLock("api"); + closeNoLock("api", closedLatch); } } + awaitPendingClose(); + } + + private void awaitPendingClose() { + try { + closedLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } public static class CommitId implements Writeable { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 14f3a0e749b..bd11afdced9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -86,6 +86,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1600,7 +1601,7 @@ public class InternalEngine extends Engine { * is failed. */ @Override - protected final void closeNoLock(String reason) { + protected final void closeNoLock(String reason, CountDownLatch closedLatch) { if (isClosed.compareAndSet(false, true)) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { @@ -1627,8 +1628,12 @@ public class InternalEngine extends Engine { } catch (Exception e) { logger.warn("failed to rollback writer on close", e); } finally { - store.decRef(); - logger.debug("engine closed [{}]", reason); + try { + store.decRef(); + logger.debug("engine closed [{}]", reason); + } finally { + closedLatch.countDown(); + } } } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ef8d71968de..0cb7dd00aa5 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -61,6 +61,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -91,6 +92,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqN import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; @@ -2926,6 +2928,60 @@ public class InternalEngineTests extends ESTestCase { assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(exception.get())); } + /** + * Tests that when the the close method returns the engine is actually guaranteed to have cleaned up and that resources are closed + */ + public void testConcurrentEngineClosed() throws BrokenBarrierException, InterruptedException { + Thread[] closingThreads = new Thread[3]; + CyclicBarrier barrier = new CyclicBarrier(1 + closingThreads.length + 1); + Thread failEngine = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + engine.failEngine("test", new RuntimeException("test")); + } + }); + failEngine.start(); + for (int i = 0;i < closingThreads.length ; i++) { + boolean flushAndClose = randomBoolean(); + closingThreads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + if (flushAndClose) { + engine.flushAndClose(); + } else { + engine.close(); + } + // try to acquire the writer lock - i.e., everything is closed, we need to synchronize + // to avoid races between closing threads + synchronized (closingThreads) { + try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // all good. + } + } + } + }); + closingThreads[i].setName("closingThread_" + i); + closingThreads[i].start(); + } + barrier.await(); + failEngine.join(); + for (Thread t : closingThreads) { + t.join(); + } + } + public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 681c1a1ec45..60434d95e62 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -223,7 +223,6 @@ public class TruncateTranslogIT extends ESIntegTestCase { expectSeqNoRecovery ? equalTo(0) : greaterThan(0)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25817") public void testCorruptTranslogTruncationOfReplica() throws Exception { internalCluster().startNodes(2, Settings.EMPTY);