LUCENE-1160: see which index Directory a MergeException came from; make CMS more easily subclassed

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@617632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2008-02-01 20:22:25 +00:00
parent 3a24900ef5
commit 115faf7b5d
4 changed files with 167 additions and 16 deletions

View File

@ -35,14 +35,14 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private int mergeThreadPriority = -1; private int mergeThreadPriority = -1;
private List mergeThreads = new ArrayList(); protected List mergeThreads = new ArrayList();
private int maxThreadCount = 3; private int maxThreadCount = 3;
private List exceptions = new ArrayList(); private List exceptions = new ArrayList();
private Directory dir; protected Directory dir;
private boolean closed; private boolean closed;
private IndexWriter writer; protected IndexWriter writer;
public ConcurrentMergeScheduler() { public ConcurrentMergeScheduler() {
if (allInstances != null) { if (allInstances != null) {
@ -176,11 +176,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (mergeThreadCount() < maxThreadCount) { if (mergeThreadCount() < maxThreadCount) {
// OK to spawn a new merge thread to handle this // OK to spawn a new merge thread to handle this
// merge: // merge:
MergeThread merger = new MergeThread(writer, merge); final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger); mergeThreads.add(merger);
message(" launch new thread [" + merger.getName() + "]"); message(" launch new thread [" + merger.getName() + "]");
merger.setThreadPriority(mergeThreadPriority);
merger.setDaemon(true);
merger.start(); merger.start();
continue; continue;
} else } else
@ -190,11 +188,25 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// Too many merge threads already running, so we do // Too many merge threads already running, so we do
// this in the foreground of the calling thread // this in the foreground of the calling thread
writer.merge(merge); doMerge(merge);
} }
} }
private class MergeThread extends Thread { /** Does the actual merge, by calling {@link IndexWriter#merge} */
protected void doMerge(MergePolicy.OneMerge merge)
throws IOException {
writer.merge(merge);
}
/** Create and return a new MergeThread */
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
final MergeThread thread = new MergeThread(writer, merge);
thread.setThreadPriority(mergeThreadPriority);
thread.setDaemon(true);
return thread;
}
protected class MergeThread extends Thread {
IndexWriter writer; IndexWriter writer;
MergePolicy.OneMerge startMerge; MergePolicy.OneMerge startMerge;
@ -237,7 +249,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
while(true) { while(true) {
setRunningMerge(merge); setRunningMerge(merge);
writer.merge(merge); doMerge(merge);
// Subsequent times through the loop we do any new // Subsequent times through the loop we do any new
// merge that writer says is necessary: // merge that writer says is necessary:
@ -268,7 +280,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// suppressExceptions is normally only set during // suppressExceptions is normally only set during
// testing. // testing.
anyExceptions = true; anyExceptions = true;
throw new MergePolicy.MergeException(exc); handleMergeException(exc);
} }
} }
} finally { } finally {
@ -287,6 +299,12 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
} }
/** Called when an exception is hit in a background merge
* thread */
protected void handleMergeException(Throwable exc) {
throw new MergePolicy.MergeException(exc, dir);
}
static boolean anyExceptions = false; static boolean anyExceptions = false;
/** Used for testing */ /** Used for testing */
@ -297,7 +315,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// any exceptions they may produce: // any exceptions they may produce:
for(int i=0;i<count;i++) for(int i=0;i<count;i++)
((ConcurrentMergeScheduler) allInstances.get(i)).sync(); ((ConcurrentMergeScheduler) allInstances.get(i)).sync();
return anyExceptions; boolean v = anyExceptions;
anyExceptions = false;
return v;
} }
} }

View File

@ -2542,7 +2542,8 @@ public class IndexWriter {
// into an inconsistent state (where segmentInfos // into an inconsistent state (where segmentInfos
// has been written with such external segments // has been written with such external segments
// that an IndexReader would fail to load). // that an IndexReader would fail to load).
throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external directory yet the MergeScheduler executed the merge in a separate thread"); throw new MergePolicy.MergeException("segment \"" + info.name + " exists in external directory yet the MergeScheduler executed the merge in a separate thread",
directory);
} }
} }
} }
@ -2882,7 +2883,7 @@ public class IndexWriter {
int first = segmentInfos.indexOf(merge.segments.info(0)); int first = segmentInfos.indexOf(merge.segments.info(0));
if (first == -1) if (first == -1)
throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current segments"); throw new MergePolicy.MergeException("could not find segment " + merge.segments.info(0).name + " in current segments", directory);
final int numSegments = segmentInfos.size(); final int numSegments = segmentInfos.size();
@ -2892,9 +2893,10 @@ public class IndexWriter {
if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) { if (first + i >= numSegments || !segmentInfos.info(first+i).equals(info)) {
if (segmentInfos.indexOf(info) == -1) if (segmentInfos.indexOf(info) == -1)
throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index"); throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the index", directory);
else else
throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle"); throw new MergePolicy.MergeException("MergePolicy selected non-contiguous segments to merge (" + merge + " vs " + segString() + "), which IndexWriter (currently) cannot handle",
directory);
} }
} }

View File

@ -117,7 +117,7 @@ public abstract class MergePolicy {
StringBuffer b = new StringBuffer(); StringBuffer b = new StringBuffer();
final int numSegments = segments.size(); final int numSegments = segments.size();
for(int i=0;i<numSegments;i++) { for(int i=0;i<numSegments;i++) {
if (i > 0) b.append(" "); if (i > 0) b.append(' ');
b.append(segments.info(i).segString(dir)); b.append(segments.info(i).segString(dir));
} }
if (info != null) if (info != null)
@ -159,12 +159,30 @@ public abstract class MergePolicy {
/** Exception thrown if there are any problems while /** Exception thrown if there are any problems while
* executing a merge. */ * executing a merge. */
public static class MergeException extends RuntimeException { public static class MergeException extends RuntimeException {
private Directory dir;
/** @deprecated
* Use {@link #MergePolicy.MergeException(String,Directory)} instead */
public MergeException(String message) { public MergeException(String message) {
super(message); super(message);
} }
public MergeException(String message, Directory dir) {
super(message);
this.dir = dir;
}
/** @deprecated
* Use {@link #MergePolicy.MergeException(Throwable,Directory)} instead */
public MergeException(Throwable exc) { public MergeException(Throwable exc) {
super(exc); super(exc);
} }
public MergeException(Throwable exc, Directory dir) {
super(exc);
this.dir = dir;
}
/** Returns the {@link Directory} of the index that hit
* the exception. */
public Directory getDirectory() {
return dir;
}
} }
public static class MergeAbortedException extends IOException { public static class MergeAbortedException extends IOException {

View File

@ -0,0 +1,111 @@
package org.apache.lucene;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
/**
* Holds tests cases to verify external APIs are accessible
* while not being in org.apache.lucene.index package.
*/
public class TestMergeSchedulerExternal extends LuceneTestCase {
volatile boolean mergeCalled;
volatile boolean mergeThreadCreated;
volatile boolean excCalled;
private class MyMergeException extends RuntimeException {
Directory dir;
public MyMergeException(Throwable exc, Directory dir) {
super(exc);
this.dir = dir;
}
}
private class MyMergeScheduler extends ConcurrentMergeScheduler {
private class MyMergeThread extends ConcurrentMergeScheduler.MergeThread {
public MyMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
super(writer, merge);
mergeThreadCreated = true;
}
}
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = new MyMergeThread(writer, merge);
thread.setThreadPriority(getMergeThreadPriority());
thread.setDaemon(true);
thread.setName("MyMergeThread");
return thread;
}
protected void handleMergeException(Throwable t) {
excCalled = true;
}
protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
mergeCalled = true;
super.doMerge(merge);
}
}
private static class FailOnlyOnMerge extends MockRAMDirectory.Failure {
public void eval(MockRAMDirectory dir) throws IOException {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("doMerge".equals(trace[i].getMethodName()))
throw new IOException("now failing during merge");
}
}
}
public void testSubclassConcurrentMergeScheduler() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
dir.failOn(new FailOnlyOnMerge());
Document doc = new Document();
Field idField = new Field("id", "", Field.Store.YES, Field.Index.UN_TOKENIZED);
doc.add(idField);
IndexWriter writer = new IndexWriter(dir, true, new WhitespaceAnalyzer(), true);
MyMergeScheduler ms = new MyMergeScheduler();
writer.setMergeScheduler(ms);
writer.setMaxBufferedDocs(2);
writer.setRAMBufferSizeMB(writer.DISABLE_AUTO_FLUSH);
for(int i=0;i<20;i++)
writer.addDocument(doc);
ms.sync();
writer.close();
assertTrue(mergeThreadCreated);
assertTrue(mergeCalled);
assertTrue(excCalled);
dir.close();
assertTrue(ConcurrentMergeScheduler.anyUnhandledExceptions());
}
}