LUCENE-3023: make sure we don't miss a new initialized DWPT when flushing all DWPT

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1097156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-04-27 15:47:45 +00:00
parent f89d7b4ee0
commit 0e7dfa2bcd
5 changed files with 20 additions and 20 deletions

View File

@ -73,14 +73,19 @@ final class DocumentsWriterDeleteQueue {
/* only acquired to update the global deletes */ /* only acquired to update the global deletes */
private final ReentrantLock globalBufferLock = new ReentrantLock(); private final ReentrantLock globalBufferLock = new ReentrantLock();
long generation; final long generation;
DocumentsWriterDeleteQueue() { DocumentsWriterDeleteQueue() {
this(new BufferedDeletes(false)); this(0);
} }
DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes) { DocumentsWriterDeleteQueue(long generation) {
this(new BufferedDeletes(false), generation);
}
DocumentsWriterDeleteQueue(BufferedDeletes globalBufferedDeletes,long generation) {
this.globalBufferedDeletes = globalBufferedDeletes; this.globalBufferedDeletes = globalBufferedDeletes;
this.generation = generation;
/* /*
* we use a sentinel instance as our initial tail. No slice will ever try to * we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted. * apply this tail since the head is always omitted.

View File

@ -303,8 +303,7 @@ public final class DocumentsWriterFlushControl {
flushingQueue = documentsWriter.deleteQueue; flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until // Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush // we do another full flush
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(); DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
newQueue.generation = flushingQueue.generation + 1;
documentsWriter.deleteQueue = newQueue; documentsWriter.deleteQueue = newQueue;
} }
final Iterator<ThreadState> allActiveThreads = perThreadPool final Iterator<ThreadState> allActiveThreads = perThreadPool

View File

@ -158,19 +158,19 @@ public abstract class DocumentsWriterPerThreadPool {
/** /**
* Returns a new {@link ThreadState} iff any new state is available otherwise * Returns a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>. * <code>null</code>.
* <p>
* NOTE: the returned {@link ThreadState} is already locked iff non-
* <code>null</code>.
* *
* @param lock
* <code>true</code> iff the new {@link ThreadState} should be locked
* before published otherwise <code>false</code>.
* @return a new {@link ThreadState} iff any new state is available otherwise * @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code> * <code>null</code>
*/ */
public synchronized ThreadState newThreadState(boolean lock) { public synchronized ThreadState newThreadState() {
if (numThreadStatesActive < perThreads.length) { if (numThreadStatesActive < perThreads.length) {
final ThreadState threadState = perThreads[numThreadStatesActive]; final ThreadState threadState = perThreads[numThreadStatesActive];
threadState.lock(); threadState.lock(); // lock so nobody else will get this ThreadState
threadState.perThread.initialize();
numThreadStatesActive++; // increment will publish the ThreadState numThreadStatesActive++; // increment will publish the ThreadState
threadState.perThread.initialize();
return threadState; return threadState;
} }
return null; return null;

View File

@ -54,7 +54,7 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
// Find the state that has minimum number of threads waiting // Find the state that has minimum number of threads waiting
minThreadState = minContendedThreadState(); minThreadState = minContendedThreadState();
if (minThreadState == null || minThreadState.hasQueuedThreads()) { if (minThreadState == null || minThreadState.hasQueuedThreads()) {
final ThreadState newState = newThreadState(true); final ThreadState newState = newThreadState(); // state is already locked if non-null
if (newState != null) { if (newState != null) {
assert newState.isHeldByCurrentThread(); assert newState.isHeldByCurrentThread();
threadBindings.put(requestingThread, newState); threadBindings.put(requestingThread, newState);

View File

@ -33,8 +33,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
public class TestDocumentsWriterDeleteQueue extends LuceneTestCase { public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
public void testUpdateDelteSlices() { public void testUpdateDelteSlices() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
new BufferedDeletes(false));
final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER; final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size]; Integer[] ids = new Integer[size];
for (int i = 0; i < ids.length; i++) { for (int i = 0; i < ids.length; i++) {
@ -88,8 +87,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
} }
public void testClear() { public void testClear() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
new BufferedDeletes(false));
Term template = new Term("id"); Term template = new Term("id");
assertFalse(queue.anyChanges()); assertFalse(queue.anyChanges());
queue.clear(); queue.clear();
@ -117,8 +115,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
} }
public void testAnyChanges() { public void testAnyChanges() {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
new BufferedDeletes(false));
Term template = new Term("id"); Term template = new Term("id");
final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER; final int size = 200 + random.nextInt(500) * RANDOM_MULTIPLIER;
int termsSinceFreeze = 0; int termsSinceFreeze = 0;
@ -146,8 +143,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
} }
public void testStressDeleteQueue() throws InterruptedException { public void testStressDeleteQueue() throws InterruptedException {
DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue( DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
new BufferedDeletes(false));
Set<Term> uniqueValues = new HashSet<Term>(); Set<Term> uniqueValues = new HashSet<Term>();
final int size = 10000 + random.nextInt(500) * RANDOM_MULTIPLIER; final int size = 10000 + random.nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size]; Integer[] ids = new Integer[size];