LUCENE-2329: fix deadlock case after flush; fixed ram balancing between deleting & indexing

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@931298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-04-06 19:56:59 +00:00
parent b679816a70
commit d40dacc805
4 changed files with 103 additions and 33 deletions

View File

@ -207,8 +207,6 @@ final class DocumentsWriter {
setLength(0);
// Recycle the blocks
final int blockCount = buffers.size();
perDocAllocator.recycleByteBlocks(buffers);
buffers.clear();
sizeInBytes = 0;
@ -688,6 +686,7 @@ final class DocumentsWriter {
}
synchronized void clearFlushPending() {
bufferIsFull = false;
flushPending = false;
}
@ -912,29 +911,37 @@ final class DocumentsWriter {
throw new AlreadyClosedException("this IndexWriter is closed");
}
synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException {
waitReady(null);
for (int i = 0; i < terms.length; i++)
addDeleteTerm(terms[i], numDocsInRAM);
boolean bufferDeleteTerms(Term[] terms) throws IOException {
synchronized(this) {
waitReady(null);
for (int i = 0; i < terms.length; i++)
addDeleteTerm(terms[i], numDocsInRAM);
}
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteTerm(Term term) throws IOException {
waitReady(null);
addDeleteTerm(term, numDocsInRAM);
boolean bufferDeleteTerm(Term term) throws IOException {
synchronized(this) {
waitReady(null);
addDeleteTerm(term, numDocsInRAM);
}
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException {
waitReady(null);
for (int i = 0; i < queries.length; i++)
addDeleteQuery(queries[i], numDocsInRAM);
boolean bufferDeleteQueries(Query[] queries) throws IOException {
synchronized(this) {
waitReady(null);
for (int i = 0; i < queries.length; i++)
addDeleteQuery(queries[i], numDocsInRAM);
}
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteQuery(Query query) throws IOException {
waitReady(null);
addDeleteQuery(query, numDocsInRAM);
boolean bufferDeleteQuery(Query query) throws IOException {
synchronized(this) {
waitReady(null);
addDeleteQuery(query, numDocsInRAM);
}
return timeToFlushDeletes();
}
@ -947,7 +954,7 @@ final class DocumentsWriter {
synchronized boolean doApplyDeletes() {
// Very similar to deletesFull(), except we don't count
// numBytesAlloc, because we are checking whether
// numBytesUsed, because we are checking whether
// deletes (alone) are consuming too many resources now
// and thus should be applied. We apply deletes if RAM
// usage is > 1/2 of our allowed RAM buffer, to prevent
@ -960,8 +967,11 @@ final class DocumentsWriter {
((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));
}
synchronized private boolean timeToFlushDeletes() {
return (bufferIsFull || deletesFull()) && setFlushPending();
private boolean timeToFlushDeletes() {
balanceRAM();
synchronized(this) {
return (bufferIsFull || deletesFull()) && setFlushPending();
}
}
void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
@ -1155,18 +1165,13 @@ final class DocumentsWriter {
deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY);
}
synchronized boolean doBalanceRAM() {
return ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed+deletesInRAM.bytesUsed+deletesFlushed.bytesUsed >= ramBufferSize);
}
/** Does the synchronized work to finish/flush the
* inverted document. */
private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
if (doBalanceRAM())
// Must call this w/o holding synchronized(this) else
// we'll hit deadlock:
balanceRAM();
// Must call this w/o holding synchronized(this) else
// we'll hit deadlock:
balanceRAM();
synchronized(this) {
@ -1389,9 +1394,19 @@ final class DocumentsWriter {
* which balances the pools to match the current docs. */
void balanceRAM() {
final long deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed;
final boolean doBalance;
final long deletesRAMUsed;
if (numBytesUsed+deletesRAMUsed > ramBufferSize) {
synchronized(this) {
if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
return;
}
deletesRAMUsed = deletesInRAM.bytesUsed+deletesFlushed.bytesUsed;
doBalance = numBytesUsed+deletesRAMUsed >= ramBufferSize;
}
if (doBalance) {
if (infoStream != null)
message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +

View File

@ -3796,6 +3796,7 @@ public class IndexWriter implements Closeable {
// never hit
return false;
} finally {
docWriter.clearFlushPending();
docWriter.resumeAllThreads();
}
}
@ -4560,6 +4561,9 @@ public class IndexWriter implements Closeable {
// Apply buffered deletes to all segments.
private final synchronized boolean applyDeletes() throws CorruptIndexException, IOException {
assert testPoint("startApplyDeletes");
if (infoStream != null) {
message("applyDeletes");
}
flushDeletesCount++;
SegmentInfos rollback = (SegmentInfos) segmentInfos.clone();
boolean success = false;

View File

@ -71,7 +71,7 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
fieldState = docInverterPerField.fieldState;
this.consumer = perThread.consumer.addField(this, fieldInfo);
postingsArray = consumer.createPostingsArray(postingsHashSize/2);
initPostingsArray();
bytesUsed(postingsArray.size * postingsArray.bytesPerPosting());
streamCount = consumer.getStreamCount();
@ -84,6 +84,10 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
nextPerField = null;
}
private void initPostingsArray() {
postingsArray = consumer.createPostingsArray(2);
}
// sugar: just forwards to DW
private void bytesUsed(long size) {
if (perThread.termsHash.trackAllocations) {
@ -111,10 +115,10 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
postingsHashMask = newSize-1;
}
// Fully free the postings array on each flush:
if (postingsArray != null) {
final int startSize = postingsArray.size;
postingsArray = postingsArray.shrink(targetSize, false);
bytesUsed(postingsArray.bytesPerPosting() * (postingsArray.size - startSize));
bytesUsed(-postingsArray.bytesPerPosting() * postingsArray.size);
postingsArray = null;
}
}
@ -309,6 +313,10 @@ final class TermsHashPerField extends InvertedDocConsumerPerField {
@Override
boolean start(Fieldable[] fields, int count) throws IOException {
doCall = consumer.start(fields, count);
if (postingsArray == null) {
initPostingsArray();
}
if (nextPerField != null)
doNextCall = nextPerField.start(fields, count);
return doCall || doNextCall;

View File

@ -4909,4 +4909,47 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
}
private static class FlushCountingIndexWriter extends IndexWriter {
int flushCount;
public FlushCountingIndexWriter(Directory dir, IndexWriterConfig iwc) throws IOException {
super(dir, iwc);
}
public void doAfterFlush() {
flushCount++;
}
}
public void testIndexingThenDeleting() throws Exception {
final Random r = newRandom();
Directory dir = new MockRAMDirectory();
FlushCountingIndexWriter w = new FlushCountingIndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT)).setRAMBufferSizeMB(0.5));
//w.setInfoStream(System.out);
Document doc = new Document();
doc.add(new Field("field", "go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", Field.Store.NO, Field.Index.ANALYZED));
for(int iter=0;iter<6;iter++) {
int count = 0;
final boolean doIndexing = r.nextBoolean();
if (doIndexing) {
// Add docs until a flush is triggered
final int startFlushCount = w.flushCount;
while(w.flushCount == startFlushCount) {
w.addDocument(doc);
count++;
}
} else {
// Delete docs until a flush is triggered
final int startFlushCount = w.flushCount;
while(w.flushCount == startFlushCount) {
w.deleteDocuments(new Term("foo", ""+count));
count++;
}
}
assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 2500);
}
w.close();
dir.close();
}
}