Engine.close should only return when resources are freed (#25852)

Currently Engine.close can return immediately if the engine is already at the process of shutting down (due to a concurrent close call or an engine failure). This is a shame because some of our testing infra wants to do things like checking the index. This commit changes the logic to make sure that all calls to close wait until resources are freed. Failing the engine is still non blocking.

Fixes #25817
This commit is contained in:
Boaz Leskes 2017-07-25 08:08:44 +02:00 committed by GitHub
parent e816ef89a2
commit cd508555f9
4 changed files with 81 additions and 7 deletions

View File

@ -82,6 +82,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@ -99,6 +100,7 @@ public abstract class Engine implements Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CountDownLatch closedLatch = new CountDownLatch(1);
protected final EventListener eventListener;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
@ -852,7 +854,7 @@ public abstract class Engine implements Closeable {
failedEngine.set((failure != null) ? failure : new IllegalStateException(reason));
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
closeNoLock("engine failed on: [" + reason + "]", closedLatch);
} finally {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed engine [{}]", reason), failure);
// we must set a failure exception, generate one if not supplied
@ -1300,8 +1302,10 @@ public abstract class Engine implements Closeable {
/**
* Method to close the engine while the write lock is held.
* Must decrement the supplied when closing work is done and resources are
* freed.
*/
protected abstract void closeNoLock(String reason);
protected abstract void closeNoLock(String reason, CountDownLatch closedLatch);
/**
* Flush the engine (committing segments to disk and truncating the
@ -1324,6 +1328,7 @@ public abstract class Engine implements Closeable {
}
}
}
awaitPendingClose();
}
@Override
@ -1332,9 +1337,18 @@ public abstract class Engine implements Closeable {
logger.debug("close now acquiring writeLock");
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
closeNoLock("api", closedLatch);
}
}
awaitPendingClose();
}
private void awaitPendingClose() {
try {
closedLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static class CommitId implements Writeable {

View File

@ -86,6 +86,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -1600,7 +1601,7 @@ public class InternalEngine extends Engine {
* is failed.
*/
@Override
protected final void closeNoLock(String reason) {
protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
try {
@ -1627,8 +1628,12 @@ public class InternalEngine extends Engine {
} catch (Exception e) {
logger.warn("failed to rollback writer on close", e);
} finally {
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
}
}
}

View File

@ -61,6 +61,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
@ -91,6 +92,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqN
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -2926,6 +2928,60 @@ public class InternalEngineTests extends ESTestCase {
assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(exception.get()));
}
/**
* Tests that when the the close method returns the engine is actually guaranteed to have cleaned up and that resources are closed
*/
public void testConcurrentEngineClosed() throws BrokenBarrierException, InterruptedException {
Thread[] closingThreads = new Thread[3];
CyclicBarrier barrier = new CyclicBarrier(1 + closingThreads.length + 1);
Thread failEngine = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
@Override
protected void doRun() throws Exception {
barrier.await();
engine.failEngine("test", new RuntimeException("test"));
}
});
failEngine.start();
for (int i = 0;i < closingThreads.length ; i++) {
boolean flushAndClose = randomBoolean();
closingThreads[i] = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
@Override
protected void doRun() throws Exception {
barrier.await();
if (flushAndClose) {
engine.flushAndClose();
} else {
engine.close();
}
// try to acquire the writer lock - i.e., everything is closed, we need to synchronize
// to avoid races between closing threads
synchronized (closingThreads) {
try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
// all good.
}
}
}
});
closingThreads[i].setName("closingThread_" + i);
closingThreads[i].start();
}
barrier.await();
failEngine.join();
for (Thread t : closingThreads) {
t.join();
}
}
public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null);

View File

@ -223,7 +223,6 @@ public class TruncateTranslogIT extends ESIntegTestCase {
expectSeqNoRecovery ? equalTo(0) : greaterThan(0));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25817")
public void testCorruptTranslogTruncationOfReplica() throws Exception {
internalCluster().startNodes(2, Settings.EMPTY);