LUCENE-2095: fix thread safety issue with IW.commit

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@886257 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2009-12-02 18:59:24 +00:00
parent 892bc7f55a
commit cb325fd07f
8 changed files with 104 additions and 35 deletions

View File

@ -23,6 +23,11 @@ Bug fixes
and equals methods, cause bad things to happen when caching
BooleanQueries. (Chris Hostetter, Mike McCandless)
* LUCENE-2095: Fixes: when two threads call IndexWriter.commit() at
the same time, it's possible for commit to return control back to
one of the threads before all changes are actually committed.
(Sanne Grinovero via Mike McCandless)
New features
* LUCENE-2069: Added Unicode 4 support to CharArraySet. Due to the switch

View File

@ -42,7 +42,7 @@
<property name="Name" value="Lucene"/>
<property name="dev.version" value="3.1-dev"/>
<property name="version" value="${dev.version}"/>
<property name="compatibility.tag" value="lucene_3_0_back_compat_tests_20091125"/>
<property name="compatibility.tag" value="lucene_3_0_back_compat_tests_20091202"/>
<property name="spec.version" value="${version}"/>
<property name="year" value="2000-${current.year}"/>
<property name="final.name" value="lucene-${name}-${version}"/>

View File

@ -3378,9 +3378,14 @@ public class IndexWriter implements Closeable {
startCommit(0, commitUserData);
}
// Used only by commit, below; lock order is commitLock -> IW
private final Object commitLock = new Object();
private void commit(long sizeInBytes) throws IOException {
startCommit(sizeInBytes, null);
finishCommit();
synchronized(commitLock) {
startCommit(sizeInBytes, null);
finishCommit();
}
}
/**
@ -3430,17 +3435,26 @@ public class IndexWriter implements Closeable {
ensureOpen();
if (infoStream != null)
if (infoStream != null) {
message("commit: start");
}
if (pendingCommit == null) {
if (infoStream != null)
message("commit: now prepare");
prepareCommit(commitUserData);
} else if (infoStream != null)
message("commit: already prepared");
synchronized(commitLock) {
if (infoStream != null) {
message("commit: enter lock");
}
finishCommit();
if (pendingCommit == null) {
if (infoStream != null) {
message("commit: now prepare");
}
prepareCommit(commitUserData);
} else if (infoStream != null) {
message("commit: already prepared");
}
finishCommit();
}
}
private synchronized final void finishCommit() throws CorruptIndexException, IOException {
@ -4554,6 +4568,9 @@ public class IndexWriter implements Closeable {
assert testPoint("startStartCommit");
// TODO: as of LUCENE-2095, we can simplify this method,
// since only 1 thread can be in here at once
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}

View File

@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@ -34,7 +35,7 @@ public class RAMDirectory extends Directory implements Serializable {
private static final long serialVersionUID = 1l;
HashMap<String,RAMFile> fileMap = new HashMap<String,RAMFile>();
long sizeInBytes = 0;
final AtomicLong sizeInBytes = new AtomicLong();
// *****
// Lock acquisition sequence: RAMDirectory, then RAMFile
@ -153,7 +154,7 @@ public class RAMDirectory extends Directory implements Serializable {
* RAMOutputStream.BUFFER_SIZE. */
public synchronized final long sizeInBytes() {
ensureOpen();
return sizeInBytes;
return sizeInBytes.get();
}
/** Removes an existing file in the directory.
@ -166,7 +167,7 @@ public class RAMDirectory extends Directory implements Serializable {
if (file!=null) {
fileMap.remove(name);
file.directory = null;
sizeInBytes -= file.sizeInBytes; // updates to RAMFile.sizeInBytes synchronized on directory
sizeInBytes.addAndGet(-file.sizeInBytes);
} else
throw new FileNotFoundException(name);
}
@ -179,7 +180,7 @@ public class RAMDirectory extends Directory implements Serializable {
synchronized (this) {
RAMFile existing = fileMap.get(name);
if (existing!=null) {
sizeInBytes -= existing.sizeInBytes;
sizeInBytes.addAndGet(existing.sizeInBytes);
existing.directory = null;
}
fileMap.put(name, file);

View File

@ -27,7 +27,7 @@ class RAMFile implements Serializable {
private ArrayList<byte[]> buffers = new ArrayList<byte[]>();
long length;
RAMDirectory directory;
long sizeInBytes; // Only maintained if in a directory; updates synchronized on directory
long sizeInBytes;
// This is publicly modifiable via Directory.touchFile(), so direct access not supported
private long lastModified = System.currentTimeMillis();
@ -57,16 +57,16 @@ class RAMFile implements Serializable {
this.lastModified = lastModified;
}
final synchronized byte[] addBuffer(int size) {
final byte[] addBuffer(int size) {
byte[] buffer = newBuffer(size);
if (directory!=null)
synchronized (directory) { // Ensure addition of buffer and adjustment to directory size are atomic wrt directory
buffers.add(buffer);
directory.sizeInBytes += size;
sizeInBytes += size;
}
else
synchronized(this) {
buffers.add(buffer);
sizeInBytes += size;
}
if (directory != null) {
directory.sizeInBytes.getAndAdd(size);
}
return buffer;
}
@ -88,11 +88,8 @@ class RAMFile implements Serializable {
return new byte[size];
}
// Only valid if in a directory
long getSizeInBytes() {
synchronized (directory) {
return sizeInBytes;
}
synchronized long getSizeInBytes() {
return sizeInBytes;
}
}

View File

@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.analysis.Analyzer;
@ -4610,4 +4611,56 @@ public class TestIndexWriter extends LuceneTestCase {
_TestUtil.checkIndex(dir);
dir.close();
}
// LUCENE-2095: make sure with multiple threads commit
// doesn't return until all changes are in fact in the
// index
public void testCommitThreadSafety() throws Throwable {
final int NUM_THREADS = 5;
final double RUN_SEC = 0.5;
final Directory dir = new MockRAMDirectory();
final IndexWriter w = new IndexWriter(dir, new SimpleAnalyzer(), IndexWriter.MaxFieldLength.UNLIMITED);
w.commit();
final AtomicBoolean failed = new AtomicBoolean();
Thread[] threads = new Thread[NUM_THREADS];
final long endTime = System.currentTimeMillis()+((long) (RUN_SEC*1000));
for(int i=0;i<NUM_THREADS;i++) {
final int finalI = i;
threads[i] = new Thread() {
public void run() {
try {
final Document doc = new Document();
IndexReader r = IndexReader.open(dir);
Field f = new Field("f", "", Field.Store.NO, Field.Index.NOT_ANALYZED);
doc.add(f);
int count = 0;
while(System.currentTimeMillis() < endTime && !failed.get()) {
for(int j=0;j<10;j++) {
final String s = finalI + "_" + String.valueOf(count++);
f.setValue(s);
w.addDocument(doc);
w.commit();
IndexReader r2 = r.reopen();
assertTrue(r2 != r);
r.close();
r = r2;
assertEquals("term=f:" + s, 1, r.docFreq(new Term("f", s)));
}
}
r.close();
} catch (Throwable t) {
failed.set(true);
throw new RuntimeException(t);
}
}
};
threads[i].start();
}
for(int i=0;i<NUM_THREADS;i++) {
threads[i].join();
}
w.close();
dir.close();
assertFalse(failed.get());
}
}

View File

@ -18,7 +18,6 @@ package org.apache.lucene.store;
*/
import java.io.IOException;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Iterator;
import java.util.Random;
@ -213,7 +212,7 @@ public class MockRAMDirectory extends RAMDirectory {
throw new IOException("file " + name + " already exists");
else {
if (existing!=null) {
sizeInBytes -= existing.sizeInBytes;
sizeInBytes.getAndAdd(-existing.sizeInBytes);
existing.directory = null;
}

View File

@ -97,7 +97,7 @@ public class TestRAMDirectory extends LuceneTestCase {
searcher.close();
}
private final int numThreads = 50;
private final int numThreads = 10;
private final int docsPerThread = 40;
public void testRAMDirectorySize() throws IOException, InterruptedException {
@ -125,9 +125,6 @@ public class TestRAMDirectory extends LuceneTestCase {
} catch (IOException e) {
throw new RuntimeException(e);
}
synchronized (ramDir) {
assertEquals(ramDir.sizeInBytes(), ramDir.getRecomputedSizeInBytes());
}
}
}
};