From cd508555f9f17121d70bf32b77e39852269bc1ea Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 25 Jul 2017 08:08:44 +0200 Subject: [PATCH] Engine.close should only return when resources are freed (#25852) Currently Engine.close can return immediately if the engine is already at the process of shutting down (due to a concurrent close call or an engine failure). This is a shame because some of our testing infra wants to do things like checking the index. This commit changes the logic to make sure that all calls to close wait until resources are freed. Failing the engine is still non blocking. Fixes #25817 --- .../elasticsearch/index/engine/Engine.java | 20 ++++++- .../index/engine/InternalEngine.java | 11 +++- .../index/engine/InternalEngineTests.java | 56 +++++++++++++++++++ .../index/translog/TruncateTranslogIT.java | 1 - 4 files changed, 81 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 71fe528c6f2..daf1aa6bd0b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -99,6 +100,7 @@ public abstract class Engine implements Closeable { protected final EngineConfig engineConfig; protected final Store store; protected final AtomicBoolean isClosed = new AtomicBoolean(false); + private final CountDownLatch closedLatch = new CountDownLatch(1); protected final EventListener eventListener; protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); @@ -852,7 +854,7 @@ public abstract class Engine implements Closeable { failedEngine.set((failure != null) ? failure : new IllegalStateException(reason)); try { // we just go and close this engine - no way to recover - closeNoLock("engine failed on: [" + reason + "]"); + closeNoLock("engine failed on: [" + reason + "]", closedLatch); } finally { logger.warn((Supplier) () -> new ParameterizedMessage("failed engine [{}]", reason), failure); // we must set a failure exception, generate one if not supplied @@ -1300,8 +1302,10 @@ public abstract class Engine implements Closeable { /** * Method to close the engine while the write lock is held. + * Must decrement the supplied when closing work is done and resources are + * freed. */ - protected abstract void closeNoLock(String reason); + protected abstract void closeNoLock(String reason, CountDownLatch closedLatch); /** * Flush the engine (committing segments to disk and truncating the @@ -1324,6 +1328,7 @@ public abstract class Engine implements Closeable { } } } + awaitPendingClose(); } @Override @@ -1332,9 +1337,18 @@ public abstract class Engine implements Closeable { logger.debug("close now acquiring writeLock"); try (ReleasableLock lock = writeLock.acquire()) { logger.debug("close acquired writeLock"); - closeNoLock("api"); + closeNoLock("api", closedLatch); } } + awaitPendingClose(); + } + + private void awaitPendingClose() { + try { + closedLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } public static class CommitId implements Writeable { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 14f3a0e749b..bd11afdced9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -86,6 +86,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1600,7 +1601,7 @@ public class InternalEngine extends Engine { * is failed. */ @Override - protected final void closeNoLock(String reason) { + protected final void closeNoLock(String reason, CountDownLatch closedLatch) { if (isClosed.compareAndSet(false, true)) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { @@ -1627,8 +1628,12 @@ public class InternalEngine extends Engine { } catch (Exception e) { logger.warn("failed to rollback writer on close", e); } finally { - store.decRef(); - logger.debug("engine closed [{}]", reason); + try { + store.decRef(); + logger.debug("engine closed [{}]", reason); + } finally { + closedLatch.countDown(); + } } } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ef8d71968de..0cb7dd00aa5 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -61,6 +61,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -91,6 +92,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqN import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; @@ -2926,6 +2928,60 @@ public class InternalEngineTests extends ESTestCase { assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(exception.get())); } + /** + * Tests that when the the close method returns the engine is actually guaranteed to have cleaned up and that resources are closed + */ + public void testConcurrentEngineClosed() throws BrokenBarrierException, InterruptedException { + Thread[] closingThreads = new Thread[3]; + CyclicBarrier barrier = new CyclicBarrier(1 + closingThreads.length + 1); + Thread failEngine = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + engine.failEngine("test", new RuntimeException("test")); + } + }); + failEngine.start(); + for (int i = 0;i < closingThreads.length ; i++) { + boolean flushAndClose = randomBoolean(); + closingThreads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + if (flushAndClose) { + engine.flushAndClose(); + } else { + engine.close(); + } + // try to acquire the writer lock - i.e., everything is closed, we need to synchronize + // to avoid races between closing threads + synchronized (closingThreads) { + try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // all good. + } + } + } + }); + closingThreads[i].setName("closingThread_" + i); + closingThreads[i].start(); + } + barrier.await(); + failEngine.join(); + for (Thread t : closingThreads) { + t.join(); + } + } + public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index 681c1a1ec45..60434d95e62 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -223,7 +223,6 @@ public class TruncateTranslogIT extends ESIntegTestCase { expectSeqNoRecovery ? equalTo(0) : greaterThan(0)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25817") public void testCorruptTranslogTruncationOfReplica() throws Exception { internalCluster().startNodes(2, Settings.EMPTY);