LUCENE-8358: Relax assertion in IW#writeSomeDocValuesUpdates

This assertion is too strict since we can see this situation if for instance
a ReadersAndUpdates instance gets written to disk concurrently and
readerpooling is off. This change also simplifies ReaderPool#getReadersByRam and
adds a test for it.
This commit is contained in:
Simon Willnauer 2018-06-15 10:44:26 +02:00
parent 47b9ca6f57
commit 772e171ac6
4 changed files with 74 additions and 18 deletions

View File

@ -29,7 +29,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -592,25 +591,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} }
// Sort by largest ramBytesUsed: // Sort by largest ramBytesUsed:
PriorityQueue<ReadersAndUpdates> queue = readerPool.getReadersByRam(); final List<ReadersAndUpdates> list = readerPool.getReadersByRam();
int count = 0; int count = 0;
while (ramBytesUsed > 0.5 * ramBufferSizeMB * 1024 * 1024) { for (ReadersAndUpdates rld : list) {
ReadersAndUpdates rld = queue.poll();
if (rld == null) { if (ramBytesUsed <= 0.5 * ramBufferSizeMB * 1024 * 1024) {
break; break;
} }
// We need to do before/after because not all RAM in this RAU is used by DV updates, and // We need to do before/after because not all RAM in this RAU is used by DV updates, and
// not all of those bytes can be written here: // not all of those bytes can be written here:
long bytesUsedBefore = rld.ramBytesUsed.get(); long bytesUsedBefore = rld.ramBytesUsed.get();
if (bytesUsedBefore == 0) {
continue; // nothing to do here - lets not acquire the lock
}
// Only acquire IW lock on each write, since this is a time consuming operation. This way // Only acquire IW lock on each write, since this is a time consuming operation. This way
// other threads get a chance to run in between our writes. // other threads get a chance to run in between our writes.
synchronized (this) { synchronized (this) {
// It's possible that the segment of a reader returned by readerPool#getReadersByRam // It's possible that the segment of a reader returned by readerPool#getReadersByRam
// is dropped before being processed here. If it happens, we need to skip that reader. // is dropped before being processed here. If it happens, we need to skip that reader.
// this is also best effort to free ram, there might be some other thread writing this rld concurrently
// which wins and then if readerPooling is off this rld will be dropped.
if (readerPool.get(rld.info, false) == null) { if (readerPool.get(rld.info, false) == null) {
assert segmentInfos.contains(rld.info) == false : "Segment [" + rld.info + "] is not dropped yet";
continue; continue;
} }
if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) {

View File

@ -19,19 +19,22 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
@ -243,16 +246,36 @@ final class ReaderPool implements Closeable {
return any; return any;
} }
PriorityQueue<ReadersAndUpdates> getReadersByRam() { /**
// Sort by largest ramBytesUsed: * Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest.
PriorityQueue<ReadersAndUpdates> queue = new PriorityQueue<>(readerMap.size(), * This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered.
(a, b) -> Long.compare(b.ramBytesUsed.get(), a.ramBytesUsed.get())); */
synchronized (this) { synchronized List<ReadersAndUpdates> getReadersByRam() {
for (ReadersAndUpdates rld : readerMap.values()) { class RamRecordingHolder {
queue.add(rld); final ReadersAndUpdates updates;
final long ramBytesUsed;
RamRecordingHolder(ReadersAndUpdates updates) {
this.updates = updates;
this.ramBytesUsed = updates.ramBytesUsed.get();
} }
} }
return queue; final ArrayList<RamRecordingHolder> readersByRam;
synchronized (this) {
if (readerMap.isEmpty()) {
return Collections.emptyList();
}
readersByRam = new ArrayList<>(readerMap.size());
for (ReadersAndUpdates rld : readerMap.values()) {
// we have to record the ram usage once and then sort
// since the ram usage can change concurrently and that will confuse the sort or hit an assertion
// the we can acquire here is not enough we would need to lock all ReadersAndUpdates to make sure it doesn't
// change
readersByRam.add(new RamRecordingHolder(rld));
}
}
// Sort this outside of the lock by largest ramBytesUsed:
CollectionUtil.introSort(readersByRam, (a, b) -> Long.compare(b.ramBytesUsed, a.ramBytesUsed));
return Collections.unmodifiableList(readersByRam.stream().map(h -> h.updates).collect(Collectors.toList()));
} }

View File

@ -88,7 +88,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
return doc; return doc;
} }
public void testUpdatesAreFlushed() throws IOException { public void testUpdatesAreFlushed() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false)) IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
.setRAMBufferSizeMB(0.00000001)); .setRAMBufferSizeMB(0.00000001));

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -30,6 +31,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -278,4 +280,34 @@ public class TestReaderPool extends LuceneTestCase {
writer.close(); writer.close();
return writer.globalFieldNumberMap; return writer.globalFieldNumberMap;
} }
public void testGetReaderByRam() throws IOException {
Directory directory = newDirectory();
FieldInfos.FieldNumbers fieldNumbers = buildIndex(directory);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(directory);
SegmentInfos segmentInfos = reader.segmentInfos.clone();
ReaderPool pool = new ReaderPool(directory, directory, segmentInfos, fieldNumbers, () -> 0l,
new NullInfoStream(), null, null);
assertEquals(0, pool.getReadersByRam().size());
int ord = 0;
for (SegmentCommitInfo commitInfo : segmentInfos) {
ReadersAndUpdates readersAndUpdates = pool.get(commitInfo, true);
BinaryDocValuesFieldUpdates test = new BinaryDocValuesFieldUpdates(0, "test", commitInfo.info.maxDoc());
test.add(0, new BytesRef(new byte[ord++]));
test.finish();
readersAndUpdates.addDVUpdate(test);
}
List<ReadersAndUpdates> readersByRam = pool.getReadersByRam();
assertEquals(segmentInfos.size(), readersByRam.size());
long previousRam = Long.MAX_VALUE;
for (ReadersAndUpdates rld : readersByRam) {
assertTrue("previous: " + previousRam + " now: " + rld.ramBytesUsed.get(), previousRam >= rld.ramBytesUsed.get());
previousRam = rld.ramBytesUsed.get();
rld.dropChanges();
pool.drop(rld.info);
}
IOUtils.close(pool, reader, directory);
}
} }