From 772e171ac6e70c96295f65749d0d15339133b8a6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 15 Jun 2018 10:44:26 +0200 Subject: [PATCH] LUCENE-8358: Relax assertion in IW#writeSomeDocValuesUpdates This assertion is too strict since we can see this situation if for instance a ReadersAndUpdates instance gets written to disk concurrently and readerpooling is off. This change also simplifies ReaderPool#getReadersByRam and adds a test for it. --- .../org/apache/lucene/index/IndexWriter.java | 17 ++++---- .../org/apache/lucene/index/ReaderPool.java | 41 +++++++++++++++---- .../index/TestBinaryDocValuesUpdates.java | 2 +- .../apache/lucene/index/TestReaderPool.java | 32 +++++++++++++++ 4 files changed, 74 insertions(+), 18 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index c4c9e61f4f5..509eadbb215 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -592,25 +591,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable, } // Sort by largest ramBytesUsed: - PriorityQueue queue = readerPool.getReadersByRam(); + final List list = readerPool.getReadersByRam(); int count = 0; - while (ramBytesUsed > 0.5 * ramBufferSizeMB * 1024 * 1024) { - ReadersAndUpdates rld = queue.poll(); - if (rld == null) { + for (ReadersAndUpdates rld : list) { + + if (ramBytesUsed <= 0.5 * ramBufferSizeMB * 1024 * 1024) { break; } - // We need to do before/after because not all RAM in this RAU is used by DV updates, and // not all of those bytes can be written here: long bytesUsedBefore = rld.ramBytesUsed.get(); - + if (bytesUsedBefore == 0) { + continue; // nothing to do here - lets not acquire the lock + } // Only acquire IW lock on each write, since this is a time consuming operation. This way // other threads get a chance to run in between our writes. synchronized (this) { // It's possible that the segment of a reader returned by readerPool#getReadersByRam // is dropped before being processed here. If it happens, we need to skip that reader. + // this is also best effort to free ram, there might be some other thread writing this rld concurrently + // which wins and then if readerPooling is off this rld will be dropped. if (readerPool.get(rld.info, false) == null) { - assert segmentInfos.contains(rld.info) == false : "Segment [" + rld.info + "] is not dropped yet"; continue; } if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { diff --git a/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java b/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java index 5f62c3724d6..980f4a19924 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReaderPool.java @@ -19,19 +19,22 @@ package org.apache.lucene.index; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; @@ -243,16 +246,36 @@ final class ReaderPool implements Closeable { return any; } - PriorityQueue getReadersByRam() { - // Sort by largest ramBytesUsed: - PriorityQueue queue = new PriorityQueue<>(readerMap.size(), - (a, b) -> Long.compare(b.ramBytesUsed.get(), a.ramBytesUsed.get())); - synchronized (this) { - for (ReadersAndUpdates rld : readerMap.values()) { - queue.add(rld); + /** + * Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest. + * This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered. + */ + synchronized List getReadersByRam() { + class RamRecordingHolder { + final ReadersAndUpdates updates; + final long ramBytesUsed; + RamRecordingHolder(ReadersAndUpdates updates) { + this.updates = updates; + this.ramBytesUsed = updates.ramBytesUsed.get(); } } - return queue; + final ArrayList readersByRam; + synchronized (this) { + if (readerMap.isEmpty()) { + return Collections.emptyList(); + } + readersByRam = new ArrayList<>(readerMap.size()); + for (ReadersAndUpdates rld : readerMap.values()) { + // we have to record the ram usage once and then sort + // since the ram usage can change concurrently and that will confuse the sort or hit an assertion + // the we can acquire here is not enough we would need to lock all ReadersAndUpdates to make sure it doesn't + // change + readersByRam.add(new RamRecordingHolder(rld)); + } + } + // Sort this outside of the lock by largest ramBytesUsed: + CollectionUtil.introSort(readersByRam, (a, b) -> Long.compare(b.ramBytesUsed, a.ramBytesUsed)); + return Collections.unmodifiableList(readersByRam.stream().map(h -> h.updates).collect(Collectors.toList())); } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java index 80ae80423e7..e9942e49b2f 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java @@ -88,7 +88,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { return doc; } - public void testUpdatesAreFlushed() throws IOException { + public void testUpdatesAreFlushed() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false)) .setRAMBufferSizeMB(0.00000001)); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestReaderPool.java b/lucene/core/src/test/org/apache/lucene/index/TestReaderPool.java index 36070b59d85..7a858839c23 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestReaderPool.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestReaderPool.java @@ -19,6 +19,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,6 +31,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; @@ -278,4 +280,34 @@ public class TestReaderPool extends LuceneTestCase { writer.close(); return writer.globalFieldNumberMap; } + + public void testGetReaderByRam() throws IOException { + Directory directory = newDirectory(); + FieldInfos.FieldNumbers fieldNumbers = buildIndex(directory); + StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(directory); + SegmentInfos segmentInfos = reader.segmentInfos.clone(); + ReaderPool pool = new ReaderPool(directory, directory, segmentInfos, fieldNumbers, () -> 0l, + new NullInfoStream(), null, null); + assertEquals(0, pool.getReadersByRam().size()); + + int ord = 0; + for (SegmentCommitInfo commitInfo : segmentInfos) { + ReadersAndUpdates readersAndUpdates = pool.get(commitInfo, true); + BinaryDocValuesFieldUpdates test = new BinaryDocValuesFieldUpdates(0, "test", commitInfo.info.maxDoc()); + test.add(0, new BytesRef(new byte[ord++])); + test.finish(); + readersAndUpdates.addDVUpdate(test); + } + + List readersByRam = pool.getReadersByRam(); + assertEquals(segmentInfos.size(), readersByRam.size()); + long previousRam = Long.MAX_VALUE; + for (ReadersAndUpdates rld : readersByRam) { + assertTrue("previous: " + previousRam + " now: " + rld.ramBytesUsed.get(), previousRam >= rld.ramBytesUsed.get()); + previousRam = rld.ramBytesUsed.get(); + rld.dropChanges(); + pool.drop(rld.info); + } + IOUtils.close(pool, reader, directory); + } }