LUCENE-992: move buffered deletes into DocumentsWriter so IndexWriter.updateDocument is atomic

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@574260 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-09-10 14:33:51 +00:00
parent bd493d6c80
commit e7179ea866
5 changed files with 381 additions and 125 deletions

View File

@ -83,6 +83,9 @@ Bug fixes
13. LUCENE-991: The explain() method of BoostingTermQuery had errors when no payloads were present on a document. (Peter Keegan via Grant Ingersoll)
14. LUCENE-992: Fixed IndexWriter.updateDocument to be atomic again
(this was broken by LUCENE-843). (Ning Li via Mike McCandless)
New features
1. LUCENE-906: Elision filter for French.

View File

@ -129,6 +129,16 @@ final class DocumentsWriter {
private PrintStream infoStream;
// This Hashmap buffers delete terms in ram before they
// are applied. The key is delete term; the value is
// number of buffered documents the term applies to.
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
// How much RAM we can use before flushing. This is 0 if
// we are flushing by doc count instead.
private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
@ -265,8 +275,8 @@ final class DocumentsWriter {
/** Called if we hit an exception when adding docs,
* flushing, etc. This resets our state, discarding any
* * docs added since last flush. */
void abort() throws IOException {
* docs added since last flush. */
synchronized void abort() throws IOException {
// Forcefully remove waiting ThreadStates from line
for(int i=0;i<numWaiting;i++)
@ -275,6 +285,9 @@ final class DocumentsWriter {
pauseAllThreads();
bufferedDeleteTerms.clear();
numBufferedDeleteTerms = 0;
try {
// Discard pending norms:
@ -2063,8 +2076,10 @@ final class DocumentsWriter {
/** Returns a free (idle) ThreadState that may be used for
* indexing this one document. This call also pauses if a
* flush is pending. */
synchronized ThreadState getThreadState(Document doc) throws IOException {
* flush is pending. If delTerm is non-null then we
* buffer this deleted term after the thread state has
* been acquired. */
synchronized ThreadState getThreadState(Document doc, Term delTerm) throws IOException {
// First, find a thread state. If this thread already
// has affinity to a specific ThreadState, use that one
@ -2134,6 +2149,9 @@ final class DocumentsWriter {
}
}
if (delTerm != null)
addDeleteTerm(delTerm, state.docID);
return state;
}
@ -2141,9 +2159,19 @@ final class DocumentsWriter {
* flush. */
boolean addDocument(Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, null);
}
boolean updateDocument(Term t, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, t);
}
boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
throws CorruptIndexException, IOException {
// This call is synchronized but fast
final ThreadState state = getThreadState(doc);
final ThreadState state = getThreadState(doc, delTerm);
boolean success = false;
try {
// This call is not synchronized and does all the work
@ -2157,7 +2185,96 @@ final class DocumentsWriter {
abort();
}
}
return state.doFlushAfter;
return state.doFlushAfter || timeToFlushDeletes();
}
synchronized int getNumBufferedDeleteTerms() {
return numBufferedDeleteTerms;
}
synchronized HashMap getBufferedDeleteTerms() {
return bufferedDeleteTerms;
}
// Reset buffered deletes.
synchronized void clearBufferedDeleteTerms() {
bufferedDeleteTerms.clear();
numBufferedDeleteTerms = 0;
}
synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException {
while(pauseThreads != 0 || flushPending)
try {
wait();
} catch (InterruptedException e) {}
for (int i = 0; i < terms.length; i++)
addDeleteTerm(terms[i], numDocsInRAM);
return timeToFlushDeletes();
}
synchronized boolean bufferDeleteTerm(Term term) throws IOException {
while(pauseThreads != 0 || flushPending)
try {
wait();
} catch (InterruptedException e) {}
addDeleteTerm(term, numDocsInRAM);
return timeToFlushDeletes();
}
synchronized private boolean timeToFlushDeletes() {
return numBufferedDeleteTerms >= maxBufferedDeleteTerms && setFlushPending();
}
void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
if (maxBufferedDeleteTerms < 1)
throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1");
this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
}
int getMaxBufferedDeleteTerms() {
return maxBufferedDeleteTerms;
}
synchronized boolean hasDeletes() {
return bufferedDeleteTerms.size() > 0;
}
// Number of documents a delete term applies to.
static class Num {
private int num;
Num(int num) {
this.num = num;
}
int getNum() {
return num;
}
void setNum(int num) {
// Only record the new number if it's greater than the
// current one. This is important because if multiple
// threads are replacing the same doc at nearly the
// same time, it's possible that one thread that got a
// higher docID is scheduled before the other
// threads.
if (num > this.num)
this.num = num;
}
}
// Buffer a term in bufferedDeleteTerms, which records the
// current number of documents buffered in ram so that the
// delete term will be applied to those documents as well
// as the disk segments.
synchronized private void addDeleteTerm(Term term, int docCount) {
Num num = (Num) bufferedDeleteTerms.get(term);
if (num == null) {
bufferedDeleteTerms.put(term, new Num(docCount));
} else {
num.setNum(docCount);
}
numBufferedDeleteTerms++;
}
/** Does the synchronized work to finish/flush the

View File

@ -247,16 +247,6 @@ public class IndexWriter {
private int termIndexInterval = DEFAULT_TERM_INDEX_INTERVAL;
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
private int maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS;
// This Hashmap buffers delete terms in ram before they are applied.
// The key is delete term; the value is number of ram
// segments the term applies to.
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
/** Use compound file setting. Defaults to true, minimizing the number of
* files used. Setting this to false may improve indexing performance, but
* may also cause file handle problems.
@ -773,9 +763,7 @@ public class IndexWriter {
*/
public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
ensureOpen();
if (maxBufferedDeleteTerms < 1)
throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1");
this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
docWriter.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);
}
/**
@ -785,7 +773,7 @@ public class IndexWriter {
*/
public int getMaxBufferedDeleteTerms() {
ensureOpen();
return maxBufferedDeleteTerms;
return docWriter.getMaxBufferedDeleteTerms();
}
/** Determines how often segment indices are merged by addDocument(). With
@ -1134,10 +1122,11 @@ public class IndexWriter {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public synchronized void deleteDocuments(Term term) throws CorruptIndexException, IOException {
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
bufferDeleteTerm(term);
maybeFlush();
boolean doFlush = docWriter.bufferDeleteTerm(term);
if (doFlush)
flush(true, false);
}
/**
@ -1148,12 +1137,11 @@ public class IndexWriter {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public synchronized void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException {
public void deleteDocuments(Term[] terms) throws CorruptIndexException, IOException {
ensureOpen();
for (int i = 0; i < terms.length; i++) {
bufferDeleteTerm(terms[i]);
}
maybeFlush();
boolean doFlush = docWriter.bufferDeleteTerms(terms);
if (doFlush)
flush(true, false);
}
/**
@ -1189,20 +1177,15 @@ public class IndexWriter {
public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
ensureOpen();
synchronized (this) {
bufferDeleteTerm(term);
}
boolean success = false;
boolean doFlush = false;
try {
success = docWriter.addDocument(doc, analyzer);
doFlush = docWriter.updateDocument(term, doc, analyzer);
} catch (IOException ioe) {
deleter.refresh();
throw ioe;
}
if (success)
if (doFlush)
flush(true, false);
else
maybeFlush();
}
// for test purpose
@ -1357,7 +1340,7 @@ public class IndexWriter {
*/
private void startTransaction() throws IOException {
assert numBufferedDeleteTerms == 0 :
assert docWriter.getNumBufferedDeleteTerms() == 0 :
"calling startTransaction with buffered delete terms not supported";
assert docWriter.getNumDocsInRAM() == 0 :
"calling startTransaction with buffered documents not supported";
@ -1462,9 +1445,6 @@ public class IndexWriter {
deleter.checkpoint(segmentInfos, false);
deleter.refresh();
bufferedDeleteTerms.clear();
numBufferedDeleteTerms = 0;
commitPending = false;
docWriter.abort();
close();
@ -1845,20 +1825,6 @@ public class IndexWriter {
throws IOException {
}
/**
* Used internally to trigger a flush if the number of
* buffered added documents or buffered deleted terms are
* large enough.
*/
protected final synchronized void maybeFlush() throws CorruptIndexException, IOException {
// We only check for flush due to number of buffered
// delete terms, because triggering of a flush due to
// too many added documents is handled by
// DocumentsWriter
if (numBufferedDeleteTerms >= maxBufferedDeleteTerms && docWriter.setFlushPending())
flush(true, false);
}
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
@ -1908,7 +1874,7 @@ public class IndexWriter {
// when they are full or writer is being closed. We
// have to fix the "applyDeletesSelectively" logic to
// apply to more than just the last flushed segment
boolean flushDeletes = bufferedDeleteTerms.size() > 0;
boolean flushDeletes = docWriter.hasDeletes();
if (infoStream != null)
infoStream.println(" flush: flushDocs=" + flushDocs +
@ -1938,9 +1904,6 @@ public class IndexWriter {
SegmentInfos rollback = null;
HashMap saveBufferedDeleteTerms = null;
int saveNumBufferedDeleteTerms = 0;
if (flushDeletes)
rollback = (SegmentInfos) segmentInfos.clone();
@ -1975,9 +1938,9 @@ public class IndexWriter {
// buffer deletes longer and then flush them to
// multiple flushed segments, when
// autoCommit=false
saveBufferedDeleteTerms = bufferedDeleteTerms;
saveNumBufferedDeleteTerms = numBufferedDeleteTerms;
applyDeletes(flushDocs);
int delCount = applyDeletes(flushDocs);
if (infoStream != null)
infoStream.println("flushed " + delCount + " deleted documents");
doAfterFlush();
}
@ -1992,11 +1955,6 @@ public class IndexWriter {
segmentInfos.clear();
segmentInfos.addAll(rollback);
if (saveBufferedDeleteTerms != null) {
numBufferedDeleteTerms = saveNumBufferedDeleteTerms;
bufferedDeleteTerms = saveBufferedDeleteTerms;
}
} else {
// Remove segment we added, if any:
if (newSegment != null &&
@ -2319,11 +2277,14 @@ public class IndexWriter {
// flushedNewSegment is true then a new segment was just
// created and flushed from the ram segments, so we will
// selectively apply the deletes to that new segment.
private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
private final int applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms();
int delCount = 0;
if (bufferedDeleteTerms.size() > 0) {
if (infoStream != null)
infoStream.println("flush " + numBufferedDeleteTerms + " buffered deleted terms on "
infoStream.println("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on "
+ segmentInfos.size() + " segments.");
if (flushedNewSegment) {
@ -2337,7 +2298,7 @@ public class IndexWriter {
// Apply delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
applyDeletesSelectively(bufferedDeleteTerms, reader);
delCount += applyDeletesSelectively(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
@ -2361,7 +2322,7 @@ public class IndexWriter {
// Apply delete terms to disk segments
// except the one just flushed from ram.
applyDeletes(bufferedDeleteTerms, reader);
delCount += applyDeletes(bufferedDeleteTerms, reader);
} finally {
if (reader != null) {
try {
@ -2374,15 +2335,10 @@ public class IndexWriter {
}
// Clean up bufferedDeleteTerms.
// Rollbacks of buffered deletes are based on restoring the old
// map, so don't modify this one. Rare enough that the gc
// overhead is almost certainly lower than the alternate, which
// would be clone to support rollback.
bufferedDeleteTerms = new HashMap();
numBufferedDeleteTerms = 0;
docWriter.clearBufferedDeleteTerms();
}
return delCount;
}
private final boolean checkNonDecreasingLevels(int start) {
@ -2410,59 +2366,28 @@ public class IndexWriter {
// For test purposes.
final synchronized int getBufferedDeleteTermsSize() {
return bufferedDeleteTerms.size();
return docWriter.getBufferedDeleteTerms().size();
}
// For test purposes.
final synchronized int getNumBufferedDeleteTerms() {
return numBufferedDeleteTerms;
}
// Number of ram segments a delete term applies to.
private static class Num {
private int num;
Num(int num) {
this.num = num;
}
int getNum() {
return num;
}
void setNum(int num) {
this.num = num;
}
}
// Buffer a term in bufferedDeleteTerms, which records the
// current number of documents buffered in ram so that the
// delete term will be applied to those ram segments as
// well as the disk segments.
private void bufferDeleteTerm(Term term) {
Num num = (Num) bufferedDeleteTerms.get(term);
int numDoc = docWriter.getNumDocsInRAM();
if (num == null) {
bufferedDeleteTerms.put(term, new Num(numDoc));
} else {
num.setNum(numDoc);
}
numBufferedDeleteTerms++;
return docWriter.getNumBufferedDeleteTerms();
}
// Apply buffered delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
private final void applyDeletesSelectively(HashMap deleteTerms,
private final int applyDeletesSelectively(HashMap deleteTerms,
IndexReader reader) throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
TermDocs docs = reader.termDocs(term);
if (docs != null) {
int num = ((Num) entry.getValue()).getNum();
int num = ((DocumentsWriter.Num) entry.getValue()).getNum();
try {
while (docs.next()) {
int doc = docs.doc();
@ -2470,21 +2395,37 @@ public class IndexWriter {
break;
}
reader.deleteDocument(doc);
delCount++;
}
} finally {
docs.close();
}
}
}
return delCount;
}
// Apply buffered delete terms to this reader.
private final void applyDeletes(HashMap deleteTerms, IndexReader reader)
private final int applyDeletes(HashMap deleteTerms, IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
reader.deleteDocuments((Term) entry.getKey());
delCount += reader.deleteDocuments((Term) entry.getKey());
}
return delCount;
}
public synchronized String segString() {
StringBuffer buffer = new StringBuffer();
for(int i = 0; i < segmentInfos.size(); i++) {
if (i > 0) {
buffer.append(' ');
}
buffer.append(segmentInfos.info(i).name + ":" + segmentInfos.info(i).docCount);
}
return buffer.toString();
}
}

View File

@ -0,0 +1,184 @@
package org.apache.lucene.index;
/**
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.util.*;
import org.apache.lucene.store.*;
import org.apache.lucene.document.*;
import org.apache.lucene.analysis.*;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.queryParser.*;
import org.apache.lucene.util._TestUtil;
import junit.framework.TestCase;
import java.util.Random;
import java.io.File;
public class TestAtomicUpdate extends TestCase {
private static final Analyzer ANALYZER = new SimpleAnalyzer();
private static final Random RANDOM = new Random();
private static abstract class TimedThread extends Thread {
boolean failed;
int count;
private static int RUN_TIME_SEC = 3;
private TimedThread[] allThreads;
abstract public void doWork() throws Throwable;
TimedThread(TimedThread[] threads) {
this.allThreads = threads;
}
public void run() {
final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
count = 0;
try {
while(System.currentTimeMillis() < stopTime && !anyErrors()) {
doWork();
count++;
}
} catch (Throwable e) {
e.printStackTrace(System.out);
failed = true;
}
}
private boolean anyErrors() {
for(int i=0;i<allThreads.length;i++)
if (allThreads[i] != null && allThreads[i].failed)
return true;
return false;
}
}
private static class IndexerThread extends TimedThread {
IndexWriter writer;
public int count;
public IndexerThread(IndexWriter writer, TimedThread[] threads) {
super(threads);
this.writer = writer;
}
public void doWork() throws Exception {
// Update all 100 docs...
for(int i=0; i<100; i++) {
Document d = new Document();
int n = RANDOM.nextInt();
d.add(new Field("id", Integer.toString(i), Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(i+10*count), Field.Store.NO, Field.Index.TOKENIZED));
writer.updateDocument(new Term("id", Integer.toString(i)), d);
}
}
}
private static class SearcherThread extends TimedThread {
private Directory directory;
public SearcherThread(Directory directory, TimedThread[] threads) {
super(threads);
this.directory = directory;
}
public void doWork() throws Throwable {
IndexReader r = IndexReader.open(directory);
try {
assertEquals(100, r.numDocs());
} catch (Throwable t) {
throw t;
}
r.close();
}
}
/*
Run one indexer and 2 searchers against single index as
stress test.
*/
public void runTest(Directory directory) throws Exception {
TimedThread[] threads = new TimedThread[4];
IndexWriter writer = new IndexWriter(directory, ANALYZER, true);
// Establish a base index of 100 docs:
for(int i=0;i<100;i++) {
Document d = new Document();
d.add(new Field("id", Integer.toString(i), Field.Store.YES, Field.Index.UN_TOKENIZED));
d.add(new Field("contents", English.intToEnglish(i), Field.Store.NO, Field.Index.TOKENIZED));
writer.addDocument(d);
}
IndexerThread indexerThread = new IndexerThread(writer, threads);
threads[0] = indexerThread;
indexerThread.start();
IndexerThread indexerThread2 = new IndexerThread(writer, threads);
threads[1] = indexerThread2;
indexerThread2.start();
SearcherThread searcherThread1 = new SearcherThread(directory, threads);
threads[2] = searcherThread1;
searcherThread1.start();
SearcherThread searcherThread2 = new SearcherThread(directory, threads);
threads[3] = searcherThread2;
searcherThread2.start();
indexerThread.join();
indexerThread2.join();
searcherThread1.join();
searcherThread2.join();
writer.close();
assertTrue("hit unexpected exception in indexer", !indexerThread.failed);
assertTrue("hit unexpected exception in indexer2", !indexerThread2.failed);
assertTrue("hit unexpected exception in search1", !searcherThread1.failed);
assertTrue("hit unexpected exception in search2", !searcherThread2.failed);
//System.out.println(" Writer: " + indexerThread.count + " iterations");
//System.out.println("Searcher 1: " + searcherThread1.count + " searchers created");
//System.out.println("Searcher 2: " + searcherThread2.count + " searchers created");
}
/*
Run above stress test against RAMDirectory and then
FSDirectory.
*/
public void testAtomicUpdates() throws Exception {
Directory directory;
// First in a RAM directory:
directory = new MockRAMDirectory();
runTest(directory);
directory.close();
// Second in an FSDirectory:
String tempDir = System.getProperty("java.io.tmpdir");
File dirPath = new File(tempDir, "lucene.test.atomic");
directory = FSDirectory.getDirectory(dirPath);
runTest(directory);
directory.close();
_TestUtil.rmDir(dirPath);
}
}

View File

@ -436,6 +436,7 @@ public class TestIndexWriterDelete extends TestCase {
catch (IOException e) {
if (debug) {
System.out.println(" hit IOException: " + e);
e.printStackTrace(System.out);
}
err = e;
if (1 == x) {
@ -503,10 +504,20 @@ public class TestIndexWriterDelete extends TestCase {
}
int result2 = hits.length();
if (success) {
if (result2 != END_COUNT) {
if (x == 0 && result2 != END_COUNT) {
fail(testName
+ ": method did not throw exception but hits.length for search on term 'aaa' is "
+ result2 + " instead of expected " + END_COUNT);
} else if (x == 1 && result2 != START_COUNT && result2 != END_COUNT) {
// It's possible that the first exception was
// "recoverable" wrt pending deletes, in which
// case the pending deletes are retained and
// then re-flushing (with plenty of disk
// space) will succeed in flushing the
// deletes:
fail(testName
+ ": method did not throw exception but hits.length for search on term 'aaa' is "
+ result2 + " instead of expected " + START_COUNT + " or " + END_COUNT);
}
} else {
// On hitting exception we still may have added
@ -515,7 +526,7 @@ public class TestIndexWriterDelete extends TestCase {
err.printStackTrace();
fail(testName
+ ": method did throw exception but hits.length for search on term 'aaa' is "
+ result2 + " instead of expected " + START_COUNT);
+ result2 + " instead of expected " + START_COUNT + " or " + END_COUNT);
}
}
@ -535,10 +546,8 @@ public class TestIndexWriterDelete extends TestCase {
}
}
// This test tests that buffered deletes are not lost due to i/o
// errors occurring after the buffered deletes have been flushed but
// before the segmentInfos have been successfully written
// This test tests that buffered deletes are cleared when
// an Exception is hit during flush.
public void testErrorAfterApplyDeletes() throws IOException {
MockRAMDirectory.Failure failure = new MockRAMDirectory.Failure() {
@ -662,9 +671,11 @@ public class TestIndexWriterDelete extends TestCase {
hitCount = getHitCount(dir, term);
// If we haven't lost the delete the hit count will be zero
assertEquals(0, hitCount);
// If the delete was not cleared then hit count will
// be 0. With autoCommit=false, we hit the exception
// on creating the compound file, so the delete was
// flushed successfully.
assertEquals(autoCommit ? 1:0, hitCount);
if (autoCommit) {
modifier.close();