Refresh should not acquire readLock (#48414)

Today, we hold the engine readLock while refreshing. Although this 
choice simplifies the correctness reasoning, it can block IndexShard 
from closing if warming an external reader takes time. The current
implementation of refresh does not need to hold readLock as
ReferenceManager can handle errors correctly if the engine is closed in
midway.

This PR is a prerequisite that we need to solve #47186.
This commit is contained in:
Nhat Nguyen 2019-10-25 17:31:35 -04:00
parent 2e3db518c9
commit 1ef87c9a68
2 changed files with 46 additions and 6 deletions

View File

@ -1569,14 +1569,12 @@ public class InternalEngine extends Engine {
} }
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
// both refresh types will result in an internal refresh but only the external will also // both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager. // pass the new reader reference to the external reader manager.
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed; boolean refreshed;
try (ReleasableLock lock = readLock.acquire()) { try {
ensureOpen(); // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
if (store.tryIncRef()) { if (store.tryIncRef()) {
// increment the ref just to ensure nobody closes the store during a refresh // increment the ref just to ensure nobody closes the store during a refresh
try { try {

View File

@ -141,6 +141,7 @@ import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import java.io.Closeable; import java.io.Closeable;
@ -5777,7 +5778,7 @@ public class InternalEngineTests extends EngineTestCase {
assertMaxSeqNoInCommitUserData(engine); assertMaxSeqNoInCommitUserData(engine);
} }
public void testRefreshAndFailEngineConcurrently() throws Exception { public void testRefreshAndCloseEngineConcurrently() throws Exception {
AtomicBoolean stopped = new AtomicBoolean(); AtomicBoolean stopped = new AtomicBoolean();
Semaphore indexedDocs = new Semaphore(0); Semaphore indexedDocs = new Semaphore(0);
Thread indexer = new Thread(() -> { Thread indexer = new Thread(() -> {
@ -5807,7 +5808,11 @@ public class InternalEngineTests extends EngineTestCase {
refresher.start(); refresher.start();
indexedDocs.acquire(randomIntBetween(1, 100)); indexedDocs.acquire(randomIntBetween(1, 100));
try { try {
engine.failEngine("test", new IOException("simulated error")); if (randomBoolean()) {
engine.failEngine("test", new IOException("simulated error"));
} else {
engine.close();
}
} finally { } finally {
stopped.set(true); stopped.set(true);
indexer.join(); indexer.join();
@ -6149,4 +6154,41 @@ public class InternalEngineTests extends EngineTestCase {
} }
} }
} }
public void testRefreshDoesNotBlockClosing() throws Exception {
final CountDownLatch refreshStarted = new CountDownLatch(1);
final CountDownLatch engineClosed = new CountDownLatch(1);
final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {
refreshStarted.countDown();
try {
engineClosed.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
@Override
public void afterRefresh(boolean didRefresh) {
assertFalse(didRefresh);
}
};
try (Store store = createStore()) {
final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null,
refreshListener, null, null, engine.config().getCircuitBreakerService());
try (InternalEngine engine = createEngine(config)) {
if (randomBoolean()) {
engine.index(indexForDoc(createParsedDoc("id", null)));
}
threadPool.executor(ThreadPool.Names.REFRESH).execute(() ->
expectThrows(AlreadyClosedException.class,
() -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true)));
refreshStarted.await();
engine.close();
engineClosed.countDown();
}
}
}
} }