LUCENE-1200: prevent rare deadlock in IndexWriter.addIndexes

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@634232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-03-06 11:52:46 +00:00
parent 5b113c8af6
commit 69f35943bd

View File

@ -2445,7 +2445,7 @@ public class IndexWriter {
* within the transactions, so they must be flushed before the * within the transactions, so they must be flushed before the
* transaction is started. * transaction is started.
*/ */
private void startTransaction() throws IOException { private synchronized void startTransaction() throws IOException {
if (infoStream != null) if (infoStream != null)
message("now start transaction"); message("now start transaction");
@ -2478,7 +2478,7 @@ public class IndexWriter {
* Rolls back the transaction and restores state to where * Rolls back the transaction and restores state to where
* we were at the start. * we were at the start.
*/ */
private void rollbackTransaction() throws IOException { private synchronized void rollbackTransaction() throws IOException {
if (infoStream != null) if (infoStream != null)
message("now rollback transaction"); message("now rollback transaction");
@ -2513,7 +2513,7 @@ public class IndexWriter {
* segments file and remove and pending deletions we have * segments file and remove and pending deletions we have
* accumulated during the transaction * accumulated during the transaction
*/ */
private void commitTransaction() throws IOException { private synchronized void commitTransaction() throws IOException {
if (infoStream != null) if (infoStream != null)
message("now commit transaction"); message("now commit transaction");
@ -2682,6 +2682,10 @@ public class IndexWriter {
* each input Directory, so it is up to the caller to * each input Directory, so it is up to the caller to
* enforce this. * enforce this.
* *
* <p><b>NOTE:</b> while this is running, any attempts to
* add or delete documents (with another thread) will be
* paused until this method completes.
*
* <p>After this completes, the index is optimized. * <p>After this completes, the index is optimized.
* *
* <p>This method is transactional in how Exceptions are * <p>This method is transactional in how Exceptions are
@ -2720,11 +2724,16 @@ public class IndexWriter {
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public synchronized void addIndexes(Directory[] dirs) public void addIndexes(Directory[] dirs)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
// Do not allow add docs or deletes while we are running:
docWriter.pauseAllThreads();
try { try {
if (infoStream != null) if (infoStream != null)
message("flush at addIndexes"); message("flush at addIndexes");
flush(true, false, true); flush(true, false, true);
@ -2734,14 +2743,17 @@ public class IndexWriter {
startTransaction(); startTransaction();
try { try {
int docCount = 0; int docCount = 0;
for (int i = 0; i < dirs.length; i++) { synchronized(this) {
SegmentInfos sis = new SegmentInfos(); // read infos from dir for (int i = 0; i < dirs.length; i++) {
sis.read(dirs[i]); SegmentInfos sis = new SegmentInfos(); // read infos from dir
for (int j = 0; j < sis.size(); j++) { sis.read(dirs[i]);
final SegmentInfo info = sis.info(j); for (int j = 0; j < sis.size(); j++) {
docCount += info.docCount; final SegmentInfo info = sis.info(j);
segmentInfos.addElement(info); // add each info docCount += info.docCount;
segmentInfos.addElement(info); // add each info
}
} }
} }
@ -2761,6 +2773,8 @@ public class IndexWriter {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally {
docWriter.resumeAllThreads();
} }
} }
@ -2782,6 +2796,10 @@ public class IndexWriter {
* each input Directory, so it is up to the caller to * each input Directory, so it is up to the caller to
* enforce this. * enforce this.
* *
* <p><b>NOTE:</b> while this is running, any attempts to
* add or delete documents (with another thread) will be
* paused until this method completes.
*
* <p> * <p>
* This requires this index not be among those to be added, and the * This requires this index not be among those to be added, and the
* upper bound* of those segment doc counts not exceed maxMergeDocs. * upper bound* of those segment doc counts not exceed maxMergeDocs.
@ -2793,11 +2811,14 @@ public class IndexWriter {
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public synchronized void addIndexesNoOptimize(Directory[] dirs) public void addIndexesNoOptimize(Directory[] dirs)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
// Do not allow add docs or deletes while we are running:
docWriter.pauseAllThreads();
try { try {
if (infoStream != null) if (infoStream != null)
message("flush at addIndexesNoOptimize"); message("flush at addIndexesNoOptimize");
@ -2810,18 +2831,20 @@ public class IndexWriter {
try { try {
int docCount = 0; int docCount = 0;
for (int i = 0; i < dirs.length; i++) { synchronized(this) {
if (directory == dirs[i]) { for (int i = 0; i < dirs.length; i++) {
// cannot add this index: segments may be deleted in merge before added if (directory == dirs[i]) {
throw new IllegalArgumentException("Cannot add this index to itself"); // cannot add this index: segments may be deleted in merge before added
} throw new IllegalArgumentException("Cannot add this index to itself");
}
SegmentInfos sis = new SegmentInfos(); // read infos from dir SegmentInfos sis = new SegmentInfos(); // read infos from dir
sis.read(dirs[i]); sis.read(dirs[i]);
for (int j = 0; j < sis.size(); j++) { for (int j = 0; j < sis.size(); j++) {
SegmentInfo info = sis.info(j); SegmentInfo info = sis.info(j);
docCount += info.docCount; docCount += info.docCount;
segmentInfos.addElement(info); // add each info segmentInfos.addElement(info); // add each info
}
} }
} }
@ -2849,18 +2872,30 @@ public class IndexWriter {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally {
docWriter.resumeAllThreads();
} }
} }
/* If any of our segments are using a directory != ours /* If any of our segments are using a directory != ours
* then copy them over. Currently this is only used by * then copy them over. Currently this is only used by
* addIndexesNoOptimize(). */ * addIndexesNoOptimize(). */
private synchronized void copyExternalSegments() throws CorruptIndexException, IOException { private void copyExternalSegments() throws CorruptIndexException, IOException {
final int numSegments = segmentInfos.size(); while(true) {
for(int i=0;i<numSegments;i++) { SegmentInfo info = null;
SegmentInfo info = segmentInfos.info(i); MergePolicy.OneMerge merge = null;
if (info.dir != directory) { synchronized(this) {
MergePolicy.OneMerge merge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), info.getUseCompoundFile()); final int numSegments = segmentInfos.size();
for(int i=0;i<numSegments;i++) {
info = segmentInfos.info(i);
if (info.dir != directory) {
merge = new MergePolicy.OneMerge(segmentInfos.range(i, 1+i), info.getUseCompoundFile());
break;
}
}
}
if (merge != null) {
if (registerMerge(merge)) { if (registerMerge(merge)) {
pendingMerges.remove(merge); pendingMerges.remove(merge);
runningMerges.add(merge); runningMerges.add(merge);
@ -2876,7 +2911,9 @@ public class IndexWriter {
// that an IndexReader would fail to load). // that an IndexReader would fail to load).
throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external directory yet the MergeScheduler executed the merge in a separate thread", throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external directory yet the MergeScheduler executed the merge in a separate thread",
directory); directory);
} } else
// No more external segments
break;
} }
} }
@ -2884,6 +2921,16 @@ public class IndexWriter {
* <p>After this completes, the index is optimized. </p> * <p>After this completes, the index is optimized. </p>
* <p>The provided IndexReaders are not closed.</p> * <p>The provided IndexReaders are not closed.</p>
* <p><b>NOTE:</b> the index in each Directory must not be
* changed (opened by a writer) while this method is
* running. This method does not acquire a write lock in
* each input Directory, so it is up to the caller to
* enforce this.
*
* <p><b>NOTE:</b> while this is running, any attempts to
* add or delete documents (with another thread) will be
* paused until this method completes.
*
* <p>See {@link #addIndexes(Directory[])} for * <p>See {@link #addIndexes(Directory[])} for
* details on transactional semantics, temporary free * details on transactional semantics, temporary free
* space required in the Directory, and non-CFS segments * space required in the Directory, and non-CFS segments
@ -2891,10 +2938,14 @@ public class IndexWriter {
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public synchronized void addIndexes(IndexReader[] readers) public void addIndexes(IndexReader[] readers)
throws CorruptIndexException, IOException { throws CorruptIndexException, IOException {
ensureOpen(); ensureOpen();
// Do not allow add docs or deletes while we are running:
docWriter.pauseAllThreads();
try { try {
optimize(); // start with zero or 1 seg optimize(); // start with zero or 1 seg
@ -2905,9 +2956,11 @@ public class IndexWriter {
IndexReader sReader = null; IndexReader sReader = null;
try { try {
if (segmentInfos.size() == 1){ // add existing index, if any synchronized(this) {
sReader = SegmentReader.get(segmentInfos.info(0)); if (segmentInfos.size() == 1){ // add existing index, if any
merger.add(sReader); sReader = SegmentReader.get(segmentInfos.info(0));
merger.add(sReader);
}
} }
for (int i = 0; i < readers.length; i++) // add new indexes for (int i = 0; i < readers.length; i++) // add new indexes
@ -2925,10 +2978,12 @@ public class IndexWriter {
sReader = null; sReader = null;
} }
segmentInfos.setSize(0); // pop old infos & add new synchronized(this) {
info = new SegmentInfo(mergedName, docCount, directory, false, true, segmentInfos.setSize(0); // pop old infos & add new
-1, null, false); info = new SegmentInfo(mergedName, docCount, directory, false, true,
segmentInfos.addElement(info); -1, null, false);
segmentInfos.addElement(info);
}
// Notify DocumentsWriter that the flushed count just increased // Notify DocumentsWriter that the flushed count just increased
docWriter.updateFlushedDocCount(docCount); docWriter.updateFlushedDocCount(docCount);
@ -2959,7 +3014,9 @@ public class IndexWriter {
try { try {
merger.createCompoundFile(mergedName + ".cfs"); merger.createCompoundFile(mergedName + ".cfs");
info.setUseCompoundFile(true); synchronized(this) {
info.setUseCompoundFile(true);
}
} finally { } finally {
if (!success) { if (!success) {
if (infoStream != null) if (infoStream != null)
@ -2974,6 +3031,8 @@ public class IndexWriter {
} catch (OutOfMemoryError oom) { } catch (OutOfMemoryError oom) {
hitOOM = true; hitOOM = true;
throw oom; throw oom;
} finally {
docWriter.resumeAllThreads();
} }
} }