diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index bd2367a89fe..ce21682bb17 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -253,6 +253,9 @@ Bug Fixes * LUCENE-9263: Fix wrong transformation of distance in meters to radians in Geo3DPoint. (Ignacio Vera) +* LUCENE-9164: Ensure IW processes all internal events before it closes itself on a rollback. + (Simon Willnauer, Nhat Nguyen, Dawid Weiss, Mike Mccandless) + Other --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 563e4fe9c12..25fe1879c1a 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -297,7 +298,85 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, final FieldNumbers globalFieldNumberMap; final DocumentsWriter docWriter; - private final Queue eventQueue = new ConcurrentLinkedQueue<>(); + private final EventQueue eventQueue = new EventQueue(this); + + static final class EventQueue implements Closeable { + private volatile boolean closed; + // we use a semaphore here instead of simply synced methods to allow + // events to be processed concurrently by multiple threads such that all events + // for a certain thread are processed once the thread returns from IW + private final Semaphore permits = new Semaphore(Integer.MAX_VALUE); + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final IndexWriter writer; + + EventQueue(IndexWriter writer) { + this.writer = writer; + } + + private void acquire() { + if (permits.tryAcquire() == false) { + throw new AlreadyClosedException("queue is closed"); + } + if (closed) { + permits.release(); + throw new AlreadyClosedException("queue is closed"); + } + } + + boolean add(Event event) { + acquire(); + try { + return queue.add(event); + } finally { + permits.release(); + } + } + + void processEvents() throws IOException { + acquire(); + try { + processEventsInternal(); + } finally { + permits.release(); + } + } + + private void processEventsInternal() throws IOException { + assert Integer.MAX_VALUE - permits.availablePermits() > 0 : "must acquire a permit before processing events"; + Event event; + while ((event = queue.poll()) != null) { + event.process(writer); + } + } + + @Override + public synchronized void close() throws IOException { // synced to prevent double closing + assert closed == false : "we should never close this twice"; + closed = true; + // it's possible that we close this queue while we are in a processEvents call + if (writer.getTragicException() != null) { + // we are already handling a tragic exception let's drop it all on the floor and return + queue.clear(); + } else { + // now we acquire all the permits to ensure we are the only one processing the queue + try { + permits.acquire(Integer.MAX_VALUE); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + try { + processEventsInternal(); + } finally { + permits.release(Integer.MAX_VALUE); + } + } + } + + int availablePermits() { + return permits.availablePermits(); + } + } + final IndexFileDeleter deleter; // used by forceMerge to note those needing merging @@ -2242,6 +2321,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, docWriter.abort(); // don't sync on IW here docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes publishFlushedSegments(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources + eventQueue.close(); synchronized (this) { if (pendingCommit != null) { @@ -5073,10 +5153,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, private void processEvents(boolean triggerMerge) throws IOException { if (tragedy.get() == null) { - Event event; - while ((event = eventQueue.poll()) != null) { - event.process(this); - } + eventQueue.processEvents(); } if (triggerMerge) { maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS); @@ -5090,7 +5167,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, * */ @FunctionalInterface - private interface Event { + interface Event { /** * Processes the event. This method is called by the {@link IndexWriter} * passed as the first argument. diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index ad2588c3a32..e3e6bbec13d 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -3774,10 +3774,58 @@ public class TestIndexWriter extends LuceneTestCase { stopped.set(true); indexer.join(); refresher.join(); + assertNull("should not consider ACE a tragedy on a closed IW", w.getTragicException()); IOUtils.close(sm, dir); } } + public void testCloseableQueue() throws IOException, InterruptedException { + try(Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig())) { + IndexWriter.EventQueue queue = new IndexWriter.EventQueue(writer); + AtomicInteger executed = new AtomicInteger(0); + + queue.add(w -> { + assertNotNull(w); + executed.incrementAndGet(); + }); + queue.add(w -> { + assertNotNull(w); + executed.incrementAndGet(); + }); + queue.processEvents(); + assertEquals(2, executed.get()); + queue.processEvents(); + assertEquals(2, executed.get()); + + queue.add(w -> { + assertNotNull(w); + executed.incrementAndGet(); + }); + queue.add(w -> { + assertNotNull(w); + executed.incrementAndGet(); + }); + + + Thread t = new Thread(() -> { + try { + queue.processEvents(); + } catch (IOException e) { + throw new AssertionError(); + } catch (AlreadyClosedException ex) { + // possible + } + }); + t.start(); + queue.close(); + t.join(); + assertEquals(4, executed.get()); + expectThrows(AlreadyClosedException.class, () -> queue.processEvents()); + expectThrows(AlreadyClosedException.class, () -> queue.add(w -> {})); + } + } + public void testRandomOperations() throws Exception { IndexWriterConfig iwc = newIndexWriterConfig(); iwc.setMergePolicy(new FilterMergePolicy(newMergePolicy()) {