LUCENE-3551: IW#nrtIsCurrent returns false-positive when full flush runs concurrently

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1197742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-11-04 20:16:08 +00:00
parent 3dbde146af
commit 73dec384b4
3 changed files with 307 additions and 21 deletions

View File

@ -117,7 +117,14 @@ final class DocumentsWriter {
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
private final TicketQueue ticketQueue = new TicketQueue();
/*
* we preserve changes during a full flush since IW might not checkout before
* we release all changes. NRT Readers otherwise suddenly return true from
* isCurrent while there are actually changes currently committed. See also
* #anyChanges() & #flushAllThreads
*/
private volatile boolean pendingChangesInCurrentFullFlush;
private Collection<String> abortedFiles; // List of files that were written before last abort()
@ -170,6 +177,7 @@ final class DocumentsWriter {
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
synchronized (ticketQueue) {
ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets();
@ -256,9 +264,22 @@ final class DocumentsWriter {
}
boolean anyChanges() {
return numDocsInRAM.get() != 0 || anyDeletions();
if (infoStream != null) {
message("docWriter: anyChanges? numDocsInRam=" + numDocsInRAM.get()
+ " deletes=" + anyDeletions() + " hasTickets:"
+ ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+ pendingChangesInCurrentFullFlush);
}
/*
* changes are either in a DWPT or in the deleteQueue.
* yet if we currently flush deletes and / or dwpt there
* could be a window where all changes are in the ticket queue
* before they are published to the IW. ie we need to check if the
* ticket queue has any tickets.
*/
return numDocsInRAM.get() != 0 || anyDeletions() || ticketQueue.hasTickets() || pendingChangesInCurrentFullFlush;
}
public int getBufferedDeleteTermsSize() {
return deleteQueue.getBufferedDeleteTermsSize();
}
@ -283,7 +304,7 @@ final class DocumentsWriter {
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream != null) {
message("DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
message("docWriter: DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
}
do {
// Try pick up pending threads here if possible
@ -417,7 +438,7 @@ final class DocumentsWriter {
synchronized (ticketQueue) {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
ticketQueue.add(ticket);
ticketQueue.incrementAndAdd(ticket);
}
// flush concurrently without locking
@ -474,8 +495,11 @@ final class DocumentsWriter {
// Keep publishing eligible flushed segments:
final FlushTicket head = ticketQueue.peek();
if (head != null && head.canPublish()) {
ticketQueue.poll();
finishFlush(head.segment, head.frozenDeletes);
try {
finishFlush(head.segment, head.frozenDeletes);
} finally {
ticketQueue.poll();
}
} else {
break;
}
@ -489,7 +513,7 @@ final class DocumentsWriter {
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
indexWriter.bufferedDeletesStream.push(bufferedDeletes);
indexWriter.publishFrozenDeletes(bufferedDeletes);
if (infoStream != null) {
message("flush: push buffered deletes: " + bufferedDeletes);
}
@ -535,6 +559,7 @@ final class DocumentsWriter {
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
// for asserts
private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
currentFullFlushDelQueue = session;
@ -554,6 +579,7 @@ final class DocumentsWriter {
}
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
@ -573,11 +599,12 @@ final class DocumentsWriter {
}
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (!anythingFlushed) { // apply deletes if we did not flush any document
if (!anythingFlushed && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
if (infoStream != null) {
message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
synchronized (ticketQueue) {
ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
applyFlushTickets();
@ -590,16 +617,21 @@ final class DocumentsWriter {
}
final void finishFullFlush(boolean success) {
if (infoStream != null) {
message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
}
assert setFlushingDeleteQueue(null);
if (success) {
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
try {
if (infoStream != null) {
message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
}
assert setFlushingDeleteQueue(null);
if (success) {
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
}
} finally {
pendingChangesInCurrentFullFlush = false;
}
}
static final class FlushTicket {
@ -618,6 +650,46 @@ final class DocumentsWriter {
}
}
static final class TicketQueue {
private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
final AtomicInteger ticketCount = new AtomicInteger();
void incTicketCount() {
ticketCount.incrementAndGet();
}
public boolean hasTickets() {
assert ticketCount.get() >= 0;
return ticketCount.get() != 0;
}
void incrementAndAdd(FlushTicket ticket) {
incTicketCount();
add(ticket);
}
void add(FlushTicket ticket) {
queue.add(ticket);
}
FlushTicket peek() {
return queue.peek();
}
FlushTicket poll() {
try {
return queue.poll();
} finally {
ticketCount.decrementAndGet();
}
}
void clear() {
queue.clear();
ticketCount.set(0);
}
}
// use by IW during close to assert all DWPT are inactive after final flush
boolean assertNoActiveDWPT() {
Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();

View File

@ -2354,6 +2354,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
return newSegment;
}
synchronized void publishFrozenDeletes(FrozenBufferedDeletes packet) throws IOException {
assert packet != null && packet.any();
synchronized (bufferedDeletesStream) {
bufferedDeletesStream.push(packet);
}
}
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer. NOTE: use
@ -2984,14 +2991,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
final boolean anySegmentFlushed;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
success = true;
flushSuccess = true;
} finally {
docWriter.finishFullFlush(success);
docWriter.finishFullFlush(flushSuccess);
}
}
success = false;
synchronized(this) {
maybeApplyDeletes(applyAllDeletes);
doAfterFlush();
@ -4074,6 +4081,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized boolean nrtIsCurrent(SegmentInfos infos) {
//System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
ensureOpen();
if (infoStream != null) {
message("nrtIsCurrent: infoVersion matches: " + (infos.version == segmentInfos.version) + " DW changes: " + docWriter.anyChanges() + " BD changes: "+bufferedDeletesStream.any());
}
return infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any();
}

View File

@ -0,0 +1,203 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.LuceneTestCase;
public class TestIndexWriterNRTIsCurrent extends LuceneTestCase {
public static class ReaderHolder {
volatile IndexReader reader;
volatile boolean stop = false;
}
public void testIsCurrentWithThreads() throws CorruptIndexException,
LockObtainFailedException, IOException, InterruptedException {
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
new MockAnalyzer(random));
IndexWriter writer = new IndexWriter(dir, conf);
if (VERBOSE) {
writer.setInfoStream(System.out);
}
ReaderHolder holder = new ReaderHolder();
ReaderThread[] threads = new ReaderThread[atLeast(3)];
final CountDownLatch latch = new CountDownLatch(1);
WriterThread writerThread = new WriterThread(holder, writer,
atLeast(500), random, latch);
for (int i = 0; i < threads.length; i++) {
threads[i] = new ReaderThread(holder, latch);
threads[i].start();
}
writerThread.start();
writerThread.join();
boolean failed = writerThread.failed != null;
if (failed)
writerThread.failed.printStackTrace();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
if (threads[i].failed != null) {
threads[i].failed.printStackTrace();
failed = true;
}
}
assertFalse(failed);
writer.close();
dir.close();
}
public static class WriterThread extends Thread {
private final ReaderHolder holder;
private final IndexWriter writer;
private final int numOps;
private final Random random;
private boolean countdown = true;
private final CountDownLatch latch;
Throwable failed;
WriterThread(ReaderHolder holder, IndexWriter writer, int numOps,
Random random, CountDownLatch latch) {
super();
this.holder = holder;
this.writer = writer;
this.numOps = numOps;
this.random = random;
this.latch = latch;
}
public void run() {
IndexReader currentReader = null;
try {
Document doc = new Document();
doc.add(new Field("id", "1", TextField.TYPE_UNSTORED));
writer.addDocument(doc);
holder.reader = currentReader = writer.getReader(true);
Term term = new Term("id");
for (int i = 0; i < numOps && !holder.stop; i++) {
float nextOp = random.nextFloat();
if (nextOp < 0.3) {
term.set("id", "1");
writer.updateDocument(term, doc);
} else if (nextOp < 0.5) {
writer.addDocument(doc);
} else {
term.set("id", "1");
writer.deleteDocuments(term);
}
if (holder.reader != currentReader) {
holder.reader = currentReader;
if (countdown) {
countdown = false;
latch.countDown();
}
}
if (random.nextBoolean()) {
writer.commit();
final IndexReader newReader = IndexReader
.openIfChanged(currentReader);
if (newReader != null) {
currentReader.decRef();
currentReader = newReader;
}
if (currentReader.numDocs() == 0) {
writer.addDocument(doc);
}
}
}
} catch (Throwable e) {
failed = e;
} finally {
holder.reader = null;
if (countdown) {
latch.countDown();
}
if (currentReader != null) {
try {
currentReader.decRef();
} catch (IOException e) {
}
}
}
if (VERBOSE) {
System.out.println("writer stopped - forced by reader: " + holder.stop);
}
}
}
public static final class ReaderThread extends Thread {
private final ReaderHolder holder;
private final CountDownLatch latch;
Throwable failed;
ReaderThread(ReaderHolder holder, CountDownLatch latch) {
super();
this.holder = holder;
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
failed = e;
return;
}
IndexReader reader;
while ((reader = holder.reader) != null) {
if (reader.tryIncRef()) {
try {
boolean current = reader.isCurrent();
if (VERBOSE) {
System.out.println("Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent:" + current);
}
assertFalse(current);
} catch (Throwable e) {
if (VERBOSE) {
System.out.println("FAILED Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent: false");
}
failed = e;
holder.stop = true;
return;
} finally {
try {
reader.decRef();
} catch (IOException e) {
if (failed == null) {
failed = e;
}
return;
}
}
}
}
}
}
}