mirror of https://github.com/apache/lucene.git
LUCENE-3023: more cleanup, asserts and javadoc
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/realtime_search@1096780 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d631ef1a1c
commit
8fb4f5a766
|
@ -338,6 +338,11 @@ final class DocumentsWriter {
|
|||
|
||||
if (flushingDWPT != null) {
|
||||
maybeMerge |= doFlush(flushingDWPT);
|
||||
} else {
|
||||
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
|
||||
if (nextPendingFlush != null) {
|
||||
maybeMerge |= doFlush(nextPendingFlush);
|
||||
}
|
||||
}
|
||||
return maybeMerge;
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
}
|
||||
}
|
||||
final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread);
|
||||
final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread, false);
|
||||
healthiness.updateStalled(this);
|
||||
return flushingDWPT;
|
||||
}
|
||||
|
@ -226,18 +226,6 @@ public final class DocumentsWriterFlushControl {
|
|||
return null;
|
||||
}
|
||||
|
||||
private DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
|
||||
if (numPending > 0) {
|
||||
final DocumentsWriterPerThread dwpt = perThread == null ? null
|
||||
: tryCheckoutForFlush(perThread, false);
|
||||
if (dwpt == null) {
|
||||
return nextPendingFlush();
|
||||
}
|
||||
return dwpt;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
|
||||
|
@ -257,7 +245,7 @@ public final class DocumentsWriterFlushControl {
|
|||
while (allActiveThreads.hasNext() && numPending > 0) {
|
||||
ThreadState next = allActiveThreads.next();
|
||||
if (next.flushPending) {
|
||||
DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false);
|
||||
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false);
|
||||
if (dwpt != null) {
|
||||
return dwpt;
|
||||
}
|
||||
|
@ -327,6 +315,7 @@ public final class DocumentsWriterFlushControl {
|
|||
if (!next.isActive()) {
|
||||
continue;
|
||||
}
|
||||
assert next.perThread.deleteQueue == flushingQueue || next.perThread.deleteQueue == documentsWriter.deleteQueue;
|
||||
if (next.perThread.deleteQueue != flushingQueue) {
|
||||
// this one is already a new DWPT
|
||||
continue;
|
||||
|
@ -346,6 +335,7 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
}
|
||||
synchronized (this) {
|
||||
assert assertBlockedFlushes(flushingQueue);
|
||||
flushQueue.addAll(blockedFlushes);
|
||||
blockedFlushes.clear();
|
||||
flushQueue.addAll(toFlush);
|
||||
|
@ -357,6 +347,7 @@ public final class DocumentsWriterFlushControl {
|
|||
assert flushQueue.isEmpty();
|
||||
try {
|
||||
if (!blockedFlushes.isEmpty()) {
|
||||
assert assertBlockedFlushes(documentsWriter.deleteQueue);
|
||||
flushQueue.addAll(blockedFlushes);
|
||||
blockedFlushes.clear();
|
||||
}
|
||||
|
@ -365,6 +356,14 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
}
|
||||
|
||||
boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
|
||||
Queue<DocumentsWriterPerThread> flushes = this.blockedFlushes;
|
||||
for (DocumentsWriterPerThread documentsWriterPerThread : flushes) {
|
||||
assert documentsWriterPerThread.deleteQueue == flushingQueue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
synchronized void abortFullFlushes() {
|
||||
try {
|
||||
for (DocumentsWriterPerThread dwpt : flushQueue) {
|
||||
|
|
|
@ -322,6 +322,11 @@ public class DocumentsWriterPerThread {
|
|||
numDocsInRAM = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares this DWPT for flushing. This method will freeze and return the
|
||||
* {@link DocumentsWriterDeleteQueue}s global buffer and apply all pending
|
||||
* deletes to this DWPT.
|
||||
*/
|
||||
FrozenBufferedDeletes prepareFlush() {
|
||||
assert numDocsInRAM > 0;
|
||||
final FrozenBufferedDeletes globalDeletes = deleteQueue.freezeGlobalBuffer(deleteSlice);
|
||||
|
@ -330,6 +335,7 @@ public class DocumentsWriterPerThread {
|
|||
if (deleteSlice != null) {
|
||||
// apply all deletes before we flush and release the delete slice
|
||||
deleteSlice.apply(pendingDeletes, numDocsInRAM);
|
||||
assert deleteSlice.isEmpty();
|
||||
deleteSlice = null;
|
||||
}
|
||||
return globalDeletes;
|
||||
|
@ -338,6 +344,7 @@ public class DocumentsWriterPerThread {
|
|||
/** Flush all pending docs to a new segment */
|
||||
FlushedSegment flush() throws IOException {
|
||||
assert numDocsInRAM > 0;
|
||||
assert deleteSlice == null : "all deletes must be applied in prepareFlush";
|
||||
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
|
||||
numDocsInRAM, writer.getConfig().getTermIndexInterval(),
|
||||
fieldInfos.buildSegmentCodecs(true), pendingDeletes);
|
||||
|
|
|
@ -199,7 +199,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
for (int x = 0; x < threads.length; x++) {
|
||||
threads[x].join();
|
||||
}
|
||||
|
||||
assertEquals(" all flushes must be due", 0, flushControl.flushBytes());
|
||||
assertEquals(numDocumentsToIndex, writer.numDocs());
|
||||
assertEquals(numDocumentsToIndex, writer.maxDoc());
|
||||
|
@ -334,6 +333,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
writer.commit();
|
||||
} catch (Throwable ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue