From 1ef87c9a689e34c6621c374bc26d30bb0ad0c757 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 25 Oct 2019 17:31:35 -0400 Subject: [PATCH] Refresh should not acquire readLock (#48414) Today, we hold the engine readLock while refreshing. Although this choice simplifies the correctness reasoning, it can block IndexShard from closing if warming an external reader takes time. The current implementation of refresh does not need to hold readLock as ReferenceManager can handle errors correctly if the engine is closed in midway. This PR is a prerequisite that we need to solve #47186. --- .../index/engine/InternalEngine.java | 6 +-- .../index/engine/InternalEngineTests.java | 46 ++++++++++++++++++- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2613e7ad330..8b4bc32dc74 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1569,14 +1569,12 @@ public class InternalEngine extends Engine { } final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are refreshing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); + try { + // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. if (store.tryIncRef()) { // increment the ref just to ensure nobody closes the store during a refresh try { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7a9d10d3418..4cd36e01405 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -141,6 +141,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; import java.io.Closeable; @@ -5777,7 +5778,7 @@ public class InternalEngineTests extends EngineTestCase { assertMaxSeqNoInCommitUserData(engine); } - public void testRefreshAndFailEngineConcurrently() throws Exception { + public void testRefreshAndCloseEngineConcurrently() throws Exception { AtomicBoolean stopped = new AtomicBoolean(); Semaphore indexedDocs = new Semaphore(0); Thread indexer = new Thread(() -> { @@ -5807,7 +5808,11 @@ public class InternalEngineTests extends EngineTestCase { refresher.start(); indexedDocs.acquire(randomIntBetween(1, 100)); try { - engine.failEngine("test", new IOException("simulated error")); + if (randomBoolean()) { + engine.failEngine("test", new IOException("simulated error")); + } else { + engine.close(); + } } finally { stopped.set(true); indexer.join(); @@ -6149,4 +6154,41 @@ public class InternalEngineTests extends EngineTestCase { } } } + + public void testRefreshDoesNotBlockClosing() throws Exception { + final CountDownLatch refreshStarted = new CountDownLatch(1); + final CountDownLatch engineClosed = new CountDownLatch(1); + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { + + @Override + public void beforeRefresh() { + refreshStarted.countDown(); + try { + engineClosed.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + + @Override + public void afterRefresh(boolean didRefresh) { + assertFalse(didRefresh); + } + }; + try (Store store = createStore()) { + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, + refreshListener, null, null, engine.config().getCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + if (randomBoolean()) { + engine.index(indexForDoc(createParsedDoc("id", null))); + } + threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> + expectThrows(AlreadyClosedException.class, + () -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true))); + refreshStarted.await(); + engine.close(); + engineClosed.countDown(); + } + } + } }