LUCENE-9164: process all events before closing gracefully (#1319)

IndexWriter must process all pending events before closing the writer during rollback to prevent AlreadyClosedExceptions from being thrown during event processing which can cause the writer to be closed with a tragic event.
This commit is contained in:
Simon Willnauer 2020-03-10 20:40:20 +01:00 committed by GitHub
parent 354f07cd3d
commit 79feb93bd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 6 deletions

View File

@ -253,6 +253,9 @@ Bug Fixes
* LUCENE-9263: Fix wrong transformation of distance in meters to radians in Geo3DPoint. (Ignacio Vera) * LUCENE-9263: Fix wrong transformation of distance in meters to radians in Geo3DPoint. (Ignacio Vera)
* LUCENE-9164: Ensure IW processes all internal events before it closes itself on a rollback.
(Simon Willnauer, Nhat Nguyen, Dawid Weiss, Mike Mccandless)
Other Other
--------------------- ---------------------

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -297,7 +298,85 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
final FieldNumbers globalFieldNumberMap; final FieldNumbers globalFieldNumberMap;
final DocumentsWriter docWriter; final DocumentsWriter docWriter;
private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<>(); private final EventQueue eventQueue = new EventQueue(this);
static final class EventQueue implements Closeable {
private volatile boolean closed;
// we use a semaphore here instead of simply synced methods to allow
// events to be processed concurrently by multiple threads such that all events
// for a certain thread are processed once the thread returns from IW
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
private final Queue<Event> queue = new ConcurrentLinkedQueue<>();
private final IndexWriter writer;
EventQueue(IndexWriter writer) {
this.writer = writer;
}
private void acquire() {
if (permits.tryAcquire() == false) {
throw new AlreadyClosedException("queue is closed");
}
if (closed) {
permits.release();
throw new AlreadyClosedException("queue is closed");
}
}
boolean add(Event event) {
acquire();
try {
return queue.add(event);
} finally {
permits.release();
}
}
void processEvents() throws IOException {
acquire();
try {
processEventsInternal();
} finally {
permits.release();
}
}
private void processEventsInternal() throws IOException {
assert Integer.MAX_VALUE - permits.availablePermits() > 0 : "must acquire a permit before processing events";
Event event;
while ((event = queue.poll()) != null) {
event.process(writer);
}
}
@Override
public synchronized void close() throws IOException { // synced to prevent double closing
assert closed == false : "we should never close this twice";
closed = true;
// it's possible that we close this queue while we are in a processEvents call
if (writer.getTragicException() != null) {
// we are already handling a tragic exception let's drop it all on the floor and return
queue.clear();
} else {
// now we acquire all the permits to ensure we are the only one processing the queue
try {
permits.acquire(Integer.MAX_VALUE);
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
try {
processEventsInternal();
} finally {
permits.release(Integer.MAX_VALUE);
}
}
}
int availablePermits() {
return permits.availablePermits();
}
}
final IndexFileDeleter deleter; final IndexFileDeleter deleter;
// used by forceMerge to note those needing merging // used by forceMerge to note those needing merging
@ -2242,6 +2321,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
docWriter.abort(); // don't sync on IW here docWriter.abort(); // don't sync on IW here
docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
publishFlushedSegments(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources publishFlushedSegments(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
eventQueue.close();
synchronized (this) { synchronized (this) {
if (pendingCommit != null) { if (pendingCommit != null) {
@ -5073,10 +5153,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private void processEvents(boolean triggerMerge) throws IOException { private void processEvents(boolean triggerMerge) throws IOException {
if (tragedy.get() == null) { if (tragedy.get() == null) {
Event event; eventQueue.processEvents();
while ((event = eventQueue.poll()) != null) {
event.process(this);
}
} }
if (triggerMerge) { if (triggerMerge) {
maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
@ -5090,7 +5167,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* *
*/ */
@FunctionalInterface @FunctionalInterface
private interface Event { interface Event {
/** /**
* Processes the event. This method is called by the {@link IndexWriter} * Processes the event. This method is called by the {@link IndexWriter}
* passed as the first argument. * passed as the first argument.

View File

@ -3774,10 +3774,58 @@ public class TestIndexWriter extends LuceneTestCase {
stopped.set(true); stopped.set(true);
indexer.join(); indexer.join();
refresher.join(); refresher.join();
assertNull("should not consider ACE a tragedy on a closed IW", w.getTragicException());
IOUtils.close(sm, dir); IOUtils.close(sm, dir);
} }
} }
public void testCloseableQueue() throws IOException, InterruptedException {
try(Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) {
IndexWriter.EventQueue queue = new IndexWriter.EventQueue(writer);
AtomicInteger executed = new AtomicInteger(0);
queue.add(w -> {
assertNotNull(w);
executed.incrementAndGet();
});
queue.add(w -> {
assertNotNull(w);
executed.incrementAndGet();
});
queue.processEvents();
assertEquals(2, executed.get());
queue.processEvents();
assertEquals(2, executed.get());
queue.add(w -> {
assertNotNull(w);
executed.incrementAndGet();
});
queue.add(w -> {
assertNotNull(w);
executed.incrementAndGet();
});
Thread t = new Thread(() -> {
try {
queue.processEvents();
} catch (IOException e) {
throw new AssertionError();
} catch (AlreadyClosedException ex) {
// possible
}
});
t.start();
queue.close();
t.join();
assertEquals(4, executed.get());
expectThrows(AlreadyClosedException.class, () -> queue.processEvents());
expectThrows(AlreadyClosedException.class, () -> queue.add(w -> {}));
}
}
public void testRandomOperations() throws Exception { public void testRandomOperations() throws Exception {
IndexWriterConfig iwc = newIndexWriterConfig(); IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMergePolicy(new FilterMergePolicy(newMergePolicy()) { iwc.setMergePolicy(new FilterMergePolicy(newMergePolicy()) {