LUCENE-8639: Prevent new threadstates from being created while we cut over to a new delete queue (#535)

This prevents an edge case where suddenly a lot of threads start indexing
while we carry over sequence ids from the previous to the new delete queue.
We now lock creation of new thread states for a very short time until we created and assigned
a new delete queue.
This commit is contained in:
Simon Willnauer 2019-01-16 16:37:49 +01:00 committed by GitHub
parent a826649241
commit e35adf6824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 25 deletions

View File

@ -268,6 +268,9 @@ Bug fixes:
* LUCENE-8625: int overflow in ByteBuffersDataInput.sliceBufferList. (Mulugeta Mammo,
Dawid Weiss)
* LUCENE-8639: Newly created threadstates while flushing / refreshing can cause duplicated
sequence IDs on IndexWriter. (Simon Willnauer)
New Features
* LUCENE-8026: ExitableDirectoryReader may now time out queries that run on

View File

@ -279,7 +279,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread");
}
perThreadPool.clearAbort();
perThreadPool.unlockNewThreadStates();
for (ThreadState state : threadStates) {
state.unlock();
}
@ -288,7 +288,7 @@ final class DocumentsWriter implements Closeable, Accountable {
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
perThreadPool.setAbort();
perThreadPool.lockNewThreadStates();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();

View File

@ -459,8 +459,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
final ThreadState perThread = perThreadPool.getAndLock();
boolean success = false;
try {
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
@ -490,14 +489,18 @@ final class DocumentsWriterFlushControl implements Accountable {
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
flushingQueue.maxSeqNo = seqNo+1;
perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
try {
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
flushingQueue.maxSeqNo = seqNo + 1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
documentsWriter.deleteQueue = newQueue;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;
} finally {
perThreadPool.unlockNewThreadStates();
}
}
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {

View File

@ -111,7 +111,7 @@ final class DocumentsWriterPerThreadPool {
private final List<ThreadState> freeList = new ArrayList<>();
private boolean aborted;
private int takenThreadStatePermits = 0;
/**
* Returns the active number of {@link ThreadState} instances.
@ -120,15 +120,21 @@ final class DocumentsWriterPerThreadPool {
return threadStates.size();
}
synchronized void setAbort() {
aborted = true;
synchronized void lockNewThreadStates() {
// this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0
// any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some
// point
assert takenThreadStatePermits >= 0;
takenThreadStatePermits++;
}
synchronized void clearAbort() {
aborted = false;
notifyAll();
synchronized void unlockNewThreadStates() {
assert takenThreadStatePermits > 0;
takenThreadStatePermits--;
if (takenThreadStatePermits == 0) {
notifyAll();
}
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>.
@ -140,18 +146,20 @@ final class DocumentsWriterPerThreadPool {
* <code>null</code>
*/
private synchronized ThreadState newThreadState() {
while (aborted) {
assert takenThreadStatePermits >= 0;
while (takenThreadStatePermits > 0) {
// we can't create new thread-states while not all permits are available
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
throw new ThreadInterruptedException(ie);
}
}
ThreadState threadState = new ThreadState(null);
threadState.lock(); // lock so nobody else will get this ThreadState
threadStates.add(threadState);
return threadState;
}
}
DocumentsWriterPerThread reset(ThreadState threadState) {
assert threadState.isHeldByCurrentThread();
@ -168,7 +176,7 @@ final class DocumentsWriterPerThreadPool {
// of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState getAndLock() {
ThreadState threadState = null;
synchronized (this) {
if (freeList.isEmpty()) {

View File

@ -245,6 +245,7 @@ final class FrozenBufferedUpdates {
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
assert finalizer != null; // access the finalizer to prevent a warning
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);

View File

@ -34,11 +34,11 @@ import org.apache.lucene.util.packed.PagedMutable;
final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
// TODO: can't this just be NumericDocValues now? avoid boxing the long value...
final static class Iterator extends DocValuesFieldUpdates.AbstractIterator {
private final AbstractPagedMutable values;
private final AbstractPagedMutable<?> values;
private final long minValue;
private long value;
Iterator(int size, long minValue, AbstractPagedMutable values, PagedMutable docs, long delGen) {
Iterator(int size, long minValue, AbstractPagedMutable<?> values, PagedMutable docs, long delGen) {
super(size, docs, delGen);
this.values = values;
this.minValue = minValue;
@ -58,7 +58,7 @@ final class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
value = values.get(idx) + minValue;
}
}
private AbstractPagedMutable values;
private AbstractPagedMutable<?> values;
private final long minValue;
NumericDocValuesFieldUpdates(long delGen, String field, int maxDoc) {

View File

@ -3657,4 +3657,38 @@ public class TestIndexWriter extends LuceneTestCase {
}
}
// see LUCENE-8639
public void testFlushWhileStartingNewThreads() throws IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
w.addDocument(new Document());
int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
assertEquals(1, activeThreadStateCount);
CountDownLatch latch = new CountDownLatch(1);
Thread thread = new Thread(() -> {
latch.countDown();
List<Closeable> states = new ArrayList<>();
try {
for (int i = 0; i < 100; i++) {
DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock();
states.add(state::unlock);
if (state.isInitialized()) {
state.dwpt.deleteQueue.getNextSequenceNumber();
} else {
w.docWriter.deleteQueue.getNextSequenceNumber();
}
}
} finally {
IOUtils.closeWhileHandlingException(states);
}
});
thread.start();
latch.await();
w.docWriter.flushControl.markForFullFlush();
thread.join();
w.docWriter.flushControl.abortFullFlushes();
w.close();
dir.close();
}
}