LUCENE-9345: Separate MergeSchedulder from IndexWriter (#1451)

This change extracts the methods that are used by MergeScheduler into
a MergeSource interface. This allows IndexWriter to better ensure
locking, hide internal methods and removes the tight coupling between the two
complex classes. This will also improve future testing.
This commit is contained in:
Simon Willnauer 2020-04-24 15:02:55 +02:00 committed by GitHub
parent 5eb117f561
commit d7e0b906ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 163 additions and 106 deletions

View File

@ -135,6 +135,10 @@ API Changes
* LUCENE-9340: Deprecate SimpleBindings#add(SortField). (Alan Woodward) * LUCENE-9340: Deprecate SimpleBindings#add(SortField). (Alan Woodward)
* LUCENE-9345: MergeScheduler is now decoupled from IndexWriter. Instead it accepts a MergeSource
interface that offers the basic methods to acquire pending merges, run the merge and do accounting
around it. (Simon Willnauer)
New Features New Features
--------------------- ---------------------
(No changes) (No changes)

View File

@ -33,6 +33,7 @@ import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** A {@link MergeScheduler} that runs each merge using a /** A {@link MergeScheduler} that runs each merge using a
@ -62,7 +63,6 @@ import org.apache.lucene.util.ThreadInterruptedException;
* settings for spinning or solid state disks for such * settings for spinning or solid state disks for such
* operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}. * operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
*/ */
public class ConcurrentMergeScheduler extends MergeScheduler { public class ConcurrentMergeScheduler extends MergeScheduler {
/** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount}, /** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount},
@ -408,9 +408,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
} }
private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException { private synchronized void initDynamicDefaults(Directory directory) throws IOException {
if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) {
boolean spins = IOUtils.spins(writer.getDirectory()); boolean spins = IOUtils.spins(directory);
// Let tests override this to help reproducing a failure on a machine that has a different // Let tests override this to help reproducing a failure on a machine that has a different
// core count than the one where the test originally failed: // core count than the one where the test originally failed:
@ -495,11 +495,14 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
@Override @Override
public synchronized void merge(IndexWriter writer, MergeTrigger trigger) throws IOException { void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
assert !Thread.holdsLock(writer); }
initDynamicDefaults(writer); @Override
public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
if (trigger == MergeTrigger.CLOSING) { if (trigger == MergeTrigger.CLOSING) {
// Disable throttling on close: // Disable throttling on close:
@ -516,18 +519,18 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (verbose()) { if (verbose()) {
message("now merge"); message("now merge");
message(" index: " + writer.segString()); message(" index(source): " + mergeSource.toString());
} }
// Iterate, pulling from the IndexWriter's queue of // Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty: // pending merges, until it's empty:
while (true) { while (true) {
if (maybeStall(writer) == false) { if (maybeStall(mergeSource) == false) {
break; break;
} }
OneMerge merge = writer.getNextMerge(); OneMerge merge = mergeSource.getNextMerge();
if (merge == null) { if (merge == null) {
if (verbose()) { if (verbose()) {
message(" no more merges pending; now return"); message(" no more merges pending; now return");
@ -537,13 +540,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
boolean success = false; boolean success = false;
try { try {
if (verbose()) {
message(" consider merge " + writer.segString(merge.segments));
}
// OK to spawn a new merge thread to handle this // OK to spawn a new merge thread to handle this
// merge: // merge:
final MergeThread newMergeThread = getMergeThread(writer, merge); final MergeThread newMergeThread = getMergeThread(mergeSource, merge);
mergeThreads.add(newMergeThread); mergeThreads.add(newMergeThread);
updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter); updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
@ -558,7 +557,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
writer.mergeFinish(merge); mergeSource.onMergeFinished(merge);
} }
} }
} }
@ -575,10 +574,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
* If this method wants to stall but the calling thread is a merge * If this method wants to stall but the calling thread is a merge
* thread, it should return false to tell caller not to kick off * thread, it should return false to tell caller not to kick off
* any new merges. */ * any new merges. */
protected synchronized boolean maybeStall(MergeSource mergeSource) {
protected synchronized boolean maybeStall(IndexWriter writer) {
long startStallTime = 0; long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { while (mergeSource.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we // This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and // have already created maxMergeCount threads, and
@ -621,27 +619,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} }
} }
/** Does the actual merge, by calling {@link IndexWriter#merge} */ /** Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge} */
protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException { protected void doMerge(MergeSource mergeSource, OneMerge merge) throws IOException {
writer.merge(merge); mergeSource.merge(merge);
} }
/** Create and return a new MergeThread */ /** Create and return a new MergeThread */
protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException { protected synchronized MergeThread getMergeThread(MergeSource mergeSource, OneMerge merge) throws IOException {
final MergeThread thread = new MergeThread(writer, merge); final MergeThread thread = new MergeThread(mergeSource, merge);
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("Lucene Merge Thread #" + mergeThreadCount++); thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
return thread; return thread;
} }
synchronized void runOnMergeFinished(IndexWriter writer) { synchronized void runOnMergeFinished(MergeSource mergeSource) {
// the merge call as well as the merge thread handling in the finally // the merge call as well as the merge thread handling in the finally
// block must be sync'd on CMS otherwise stalling decisions might cause // block must be sync'd on CMS otherwise stalling decisions might cause
// us to miss pending merges // us to miss pending merges
assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread"; assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
// Let CMS run new merges if necessary: // Let CMS run new merges if necessary:
try { try {
merge(writer, MergeTrigger.MERGE_FINISHED); merge(mergeSource, MergeTrigger.MERGE_FINISHED);
} catch (AlreadyClosedException ace) { } catch (AlreadyClosedException ace) {
// OK // OK
} catch (IOException ioe) { } catch (IOException ioe) {
@ -657,13 +655,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
/** Runs a merge thread to execute a single merge, then exits. */ /** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> { protected class MergeThread extends Thread implements Comparable<MergeThread> {
final IndexWriter writer; final MergeSource mergeSource;
final OneMerge merge; final OneMerge merge;
final MergeRateLimiter rateLimiter; final MergeRateLimiter rateLimiter;
/** Sole constructor. */ /** Sole constructor. */
public MergeThread(IndexWriter writer, OneMerge merge) { public MergeThread(MergeSource mergeSource, OneMerge merge) {
this.writer = writer; this.mergeSource = mergeSource;
this.merge = merge; this.merge = merge;
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
} }
@ -681,19 +679,19 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message(" merge thread: start"); message(" merge thread: start");
} }
doMerge(writer, merge); doMerge(mergeSource, merge);
if (verbose()) { if (verbose()) {
message(" merge thread: done"); message(" merge thread: done");
} }
runOnMergeFinished(writer); runOnMergeFinished(mergeSource);
} catch (Throwable exc) { } catch (Throwable exc) {
if (exc instanceof MergePolicy.MergeAbortedException) { if (exc instanceof MergePolicy.MergeAbortedException) {
// OK to ignore // OK to ignore
} else if (suppressExceptions == false) { } else if (suppressExceptions == false) {
// suppressExceptions is normally only set during // suppressExceptions is normally only set during
// testing. // testing.
handleMergeException(writer.getDirectory(), exc); handleMergeException(exc);
} }
} }
} }
@ -701,8 +699,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
/** Called when an exception is hit in a background merge /** Called when an exception is hit in a background merge
* thread */ * thread */
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
throw new MergePolicy.MergeException(exc, dir); throw new MergePolicy.MergeException(exc);
} }
private boolean suppressExceptions; private boolean suppressExceptions;

View File

@ -299,6 +299,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
final DocumentsWriter docWriter; final DocumentsWriter docWriter;
private final EventQueue eventQueue = new EventQueue(this); private final EventQueue eventQueue = new EventQueue(this);
private final MergeScheduler.MergeSource mergeSource = new IndexWriterMergeSource(this);
static final class EventQueue implements Closeable { static final class EventQueue implements Closeable {
private volatile boolean closed; private volatile boolean closed;
@ -805,7 +806,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
analyzer = config.getAnalyzer(); analyzer = config.getAnalyzer();
mergeScheduler = config.getMergeScheduler(); mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream); mergeScheduler.initialize(infoStream, directoryOrig);
codec = config.getCodec(); codec = config.getCodec();
OpenMode mode = config.getOpenMode(); OpenMode mode = config.getOpenMode();
final boolean indexExists; final boolean indexExists;
@ -2070,7 +2071,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} }
} }
mergeScheduler.merge(this, MergeTrigger.EXPLICIT); mergeScheduler.merge(mergeSource, MergeTrigger.EXPLICIT);
if (spec != null && doWait) { if (spec != null && doWait) {
final int numMerges = spec.merges.size(); final int numMerges = spec.merges.size();
@ -2153,7 +2154,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException { final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false); ensureOpen(false);
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) { if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
mergeScheduler.merge(this, trigger); mergeScheduler.merge(mergeSource, trigger);
} }
} }
@ -2220,7 +2221,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* *
* @lucene.experimental * @lucene.experimental
*/ */
public synchronized MergePolicy.OneMerge getNextMerge() { private synchronized MergePolicy.OneMerge getNextMerge() {
if (pendingMerges.size() == 0) { if (pendingMerges.size() == 0) {
return null; return null;
} else { } else {
@ -2535,7 +2536,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Give merge scheduler last chance to run, in case // Give merge scheduler last chance to run, in case
// any pending merges are waiting. We can't hold IW's lock // any pending merges are waiting. We can't hold IW's lock
// when going into merge because it can lead to deadlock. // when going into merge because it can lead to deadlock.
mergeScheduler.merge(this, MergeTrigger.CLOSING); mergeScheduler.merge(mergeSource, MergeTrigger.CLOSING);
synchronized (this) { synchronized (this) {
ensureOpen(false); ensureOpen(false);
@ -3669,7 +3670,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private synchronized void ensureValidMerge(MergePolicy.OneMerge merge) { private synchronized void ensureValidMerge(MergePolicy.OneMerge merge) {
for(SegmentCommitInfo info : merge.segments) { for(SegmentCommitInfo info : merge.segments) {
if (!segmentInfos.contains(info)) { if (!segmentInfos.contains(info)) {
throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.info.name + ") that is not in the current index " + segString(), directoryOrig); throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.info.name + ") that is not in the current index " + segString());
} }
} }
} }
@ -4050,7 +4051,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* *
* @lucene.experimental * @lucene.experimental
*/ */
public void merge(MergePolicy.OneMerge merge) throws IOException { protected void merge(MergePolicy.OneMerge merge) throws IOException {
boolean success = false; boolean success = false;
@ -4300,9 +4301,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
info.setDiagnostics(diagnostics); info.setDiagnostics(diagnostics);
} }
/** Does fininishing for a merge, which is fast but holds /** Does finishing for a merge, which is fast but holds
* the synchronized lock on IndexWriter instance. */ * the synchronized lock on IndexWriter instance. */
final synchronized void mergeFinish(MergePolicy.OneMerge merge) { private synchronized void mergeFinish(MergePolicy.OneMerge merge) {
// forceMerge, addIndexes or waitForMerges may be waiting // forceMerge, addIndexes or waitForMerges may be waiting
// on merges to finish. // on merges to finish.
@ -5333,4 +5334,43 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
this.numDocs = numDocs; this.numDocs = numDocs;
} }
} }
private static class IndexWriterMergeSource implements MergeScheduler.MergeSource {
private final IndexWriter writer;
private IndexWriterMergeSource(IndexWriter writer) {
this.writer = writer;
}
@Override
public MergePolicy.OneMerge getNextMerge() {
MergePolicy.OneMerge nextMerge = writer.getNextMerge();
if (nextMerge != null) {
if (writer.mergeScheduler.verbose()) {
writer.mergeScheduler.message(" checked out merge " + writer.segString(nextMerge.segments));
}
}
return nextMerge;
}
@Override
public void onMergeFinished(MergePolicy.OneMerge merge) {
writer.mergeFinish(merge);
}
@Override
public boolean hasPendingMerges() {
return writer.hasPendingMerges();
}
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
assert Thread.holdsLock(writer) == false;
writer.merge(merge);
}
public String toString() {
return writer.segString();
}
}
} }

View File

@ -403,24 +403,14 @@ public abstract class MergePolicy {
/** Exception thrown if there are any problems while executing a merge. */ /** Exception thrown if there are any problems while executing a merge. */
public static class MergeException extends RuntimeException { public static class MergeException extends RuntimeException {
private Directory dir;
/** Create a {@code MergeException}. */ /** Create a {@code MergeException}. */
public MergeException(String message, Directory dir) { public MergeException(String message) {
super(message); super(message);
this.dir = dir;
} }
/** Create a {@code MergeException}. */ /** Create a {@code MergeException}. */
public MergeException(Throwable exc, Directory dir) { public MergeException(Throwable exc) {
super(exc); super(exc);
this.dir = dir;
}
/** Returns the {@link Directory} of the index that hit
* the exception. */
public Directory getDirectory() {
return dir;
} }
} }

View File

@ -33,15 +33,15 @@ import org.apache.lucene.util.InfoStream;
*/ */
public abstract class MergeScheduler implements Closeable { public abstract class MergeScheduler implements Closeable {
/** Sole constructor. (For invocation by subclass /** Sole constructor. (For invocation by subclass
* constructors, typically implicit.) */ * constructors, typically implicit.) */
protected MergeScheduler() { protected MergeScheduler() {
} }
/** Run the merges provided by {@link IndexWriter#getNextMerge()}. /** Run the merges provided by {@link MergeSource#getNextMerge()}.
* @param writer the {@link IndexWriter} to obtain the merges from. * @param mergeSource the {@link IndexWriter} to obtain the merges from.
* @param trigger the {@link MergeTrigger} that caused this merge to happen */ * @param trigger the {@link MergeTrigger} that caused this merge to happen */
public abstract void merge(IndexWriter writer, MergeTrigger trigger) throws IOException; public abstract void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException;
/** /**
* Wraps the incoming {@link Directory} so that we can merge-throttle it * Wraps the incoming {@link Directory} so that we can merge-throttle it
@ -60,7 +60,7 @@ public abstract class MergeScheduler implements Closeable {
protected InfoStream infoStream; protected InfoStream infoStream;
/** IndexWriter calls this on init. */ /** IndexWriter calls this on init. */
final void setInfoStream(InfoStream infoStream) { void initialize(InfoStream infoStream, Directory directory) throws IOException {
this.infoStream = infoStream; this.infoStream = infoStream;
} }
@ -85,4 +85,32 @@ public abstract class MergeScheduler implements Closeable {
protected void message(String message) { protected void message(String message) {
infoStream.message("MS", message); infoStream.message("MS", message);
} }
/**
* Provides access to new merges and executes the actual merge
* @lucene.experimental
*/
public interface MergeSource {
/**
* The {@link MergeScheduler} calls this method to retrieve the next
* merge requested by the MergePolicy
*/
MergePolicy.OneMerge getNextMerge();
/**
* Does finishing for a merge.
*/
void onMergeFinished(MergePolicy.OneMerge merge);
/**
* Expert: returns true if there are merges waiting to be scheduled.
*/
boolean hasPendingMerges();
/**
* Merges the indicated segments, replacing them in the stack with a
* single segment.
*/
void merge(MergePolicy.OneMerge merge) throws IOException;
}
} }

View File

@ -42,7 +42,7 @@ public final class NoMergeScheduler extends MergeScheduler {
public void close() {} public void close() {}
@Override @Override
public void merge(IndexWriter writer, MergeTrigger trigger) {} public void merge(MergeSource mergeSource, MergeTrigger trigger) {}
@Override @Override
public Directory wrapForMerge(OneMerge merge, Directory in) { public Directory wrapForMerge(OneMerge merge, Directory in) {

View File

@ -31,13 +31,13 @@ public class SerialMergeScheduler extends MergeScheduler {
* "synchronized" so that even if the application is using * "synchronized" so that even if the application is using
* multiple threads, only one merge may run at a time. */ * multiple threads, only one merge may run at a time. */
@Override @Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException { synchronized public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while(true) { while(true) {
MergePolicy.OneMerge merge = writer.getNextMerge(); MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge == null) { if (merge == null) {
break; break;
} }
writer.merge(merge); mergeSource.merge(merge);
} }
} }

View File

@ -57,22 +57,22 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
private class MyMergeScheduler extends ConcurrentMergeScheduler { private class MyMergeScheduler extends ConcurrentMergeScheduler {
private class MyMergeThread extends ConcurrentMergeScheduler.MergeThread { private class MyMergeThread extends ConcurrentMergeScheduler.MergeThread {
public MyMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) { public MyMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) {
super(writer, merge); super(mergeSource, merge);
mergeThreadCreated = true; mergeThreadCreated = true;
} }
} }
@Override @Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = new MyMergeThread(writer, merge); MergeThread thread = new MyMergeThread(mergeSource, merge);
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("MyMergeThread"); thread.setName("MyMergeThread");
return thread; return thread;
} }
@Override @Override
protected void handleMergeException(Directory dir, Throwable t) { protected void handleMergeException(Throwable t) {
excCalled = true; excCalled = true;
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "TEST: now handleMergeException"); infoStream.message("IW", "TEST: now handleMergeException");
@ -80,9 +80,9 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
} }
@Override @Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
mergeCalled = true; mergeCalled = true;
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} }
} }
@ -153,13 +153,13 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
private static class ReportingMergeScheduler extends MergeScheduler { private static class ReportingMergeScheduler extends MergeScheduler {
@Override @Override
public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException { public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
OneMerge merge = null; OneMerge merge = null;
while ((merge = writer.getNextMerge()) != null) { while ((merge = mergeSource.getNextMerge()) != null) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("executing merge " + merge.segString()); System.out.println("executing merge " + merge.segString());
} }
writer.merge(merge); mergeSource.merge(merge);
} }
} }

View File

@ -277,7 +277,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override @Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
try { try {
// Stall all incoming merges until we see // Stall all incoming merges until we see
// maxMergeCount: // maxMergeCount:
@ -296,13 +296,13 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
// Then sleep a bit to give a chance for the bug // Then sleep a bit to give a chance for the bug
// (too many pending merges) to appear: // (too many pending merges) to appear:
Thread.sleep(20); Thread.sleep(20);
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} finally { } finally {
runningMergeCount.decrementAndGet(); runningMergeCount.decrementAndGet();
} }
} catch (Throwable t) { } catch (Throwable t) {
failed.set(true); failed.set(true);
writer.mergeFinish(merge); mergeSource.onMergeFinished(merge);
throw new RuntimeException(t); throw new RuntimeException(t);
} }
} }
@ -342,10 +342,10 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
} }
@Override @Override
public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { public void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
totMergedBytes += merge.totalBytesSize(); totMergedBytes += merge.totalBytesSize();
atLeastOneMerge.countDown(); atLeastOneMerge.countDown();
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} }
} }
@ -406,7 +406,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
final AtomicInteger runningMergeCount = new AtomicInteger(); final AtomicInteger runningMergeCount = new AtomicInteger();
@Override @Override
public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { public void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
int count = runningMergeCount.incrementAndGet(); int count = runningMergeCount.incrementAndGet();
// evil? // evil?
synchronized (this) { synchronized (this) {
@ -415,7 +415,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
} }
} }
try { try {
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} finally { } finally {
runningMergeCount.decrementAndGet(); runningMergeCount.decrementAndGet();
} }
@ -467,7 +467,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
.setMergePolicy(new LogByteSizeMergePolicy()); .setMergePolicy(new LogByteSizeMergePolicy());
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
protected boolean maybeStall(IndexWriter writer) { protected boolean maybeStall(MergeSource mergeSource) {
wasCalled.set(true); wasCalled.set(true);
return true; return true;
} }
@ -494,14 +494,14 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
final CountDownLatch mergeFinish = new CountDownLatch(1); final CountDownLatch mergeFinish = new CountDownLatch(1);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override @Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
mergeStart.countDown(); mergeStart.countDown();
try { try {
mergeFinish.await(); mergeFinish.await();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} }
}; };
cms.setMaxMergesAndThreads(1, 1); cms.setMaxMergesAndThreads(1, 1);

View File

@ -40,7 +40,7 @@ public class TestForceMergeForever extends LuceneTestCase {
} }
@Override @Override
public void merge(MergePolicy.OneMerge merge) throws IOException { protected void merge(MergePolicy.OneMerge merge) throws IOException {
if (merge.maxNumSegments != -1 && (first || merge.segments.size() == 1)) { if (merge.maxNumSegments != -1 && (first || merge.segments.size() == 1)) {
first = false; first = false;
if (VERBOSE) { if (VERBOSE) {

View File

@ -427,7 +427,7 @@ public class TestIndexFileDeleter extends LuceneTestCase {
if (ms instanceof ConcurrentMergeScheduler) { if (ms instanceof ConcurrentMergeScheduler) {
final ConcurrentMergeScheduler suppressFakeFail = new ConcurrentMergeScheduler() { final ConcurrentMergeScheduler suppressFakeFail = new ConcurrentMergeScheduler() {
@Override @Override
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
// suppress only FakeIOException: // suppress only FakeIOException:
if (exc instanceof RuntimeException && exc.getMessage().equals("fake fail")) { if (exc instanceof RuntimeException && exc.getMessage().equals("fake fail")) {
// ok to ignore // ok to ignore
@ -435,7 +435,7 @@ public class TestIndexFileDeleter extends LuceneTestCase {
&& exc.getCause() != null && "fake fail".equals(exc.getCause().getMessage())) { && exc.getCause() != null && "fake fail".equals(exc.getCause().getMessage())) {
// also ok to ignore // also ok to ignore
} else { } else {
super.handleMergeException(dir, exc); super.handleMergeException( exc);
} }
} }
}; };

View File

@ -2399,7 +2399,7 @@ public class TestIndexWriter extends LuceneTestCase {
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
public void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { public void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
mergeStarted.countDown(); mergeStarted.countDown();
try { try {
closeStarted.await(); closeStarted.await();
@ -2407,7 +2407,7 @@ public class TestIndexWriter extends LuceneTestCase {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} }
@Override @Override

View File

@ -310,10 +310,10 @@ public class TestIndexWriterMerging extends LuceneTestCase {
// merging a segment with >= 20 (maxMergeDocs) docs // merging a segment with >= 20 (maxMergeDocs) docs
private static class MyMergeScheduler extends MergeScheduler { private static class MyMergeScheduler extends MergeScheduler {
@Override @Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException { synchronized public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
while(true) { while(true) {
MergePolicy.OneMerge merge = writer.getNextMerge(); MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge == null) { if (merge == null) {
break; break;
} }
@ -323,7 +323,7 @@ public class TestIndexWriterMerging extends LuceneTestCase {
numDocs += maxDoc; numDocs += maxDoc;
assertTrue(maxDoc < 20); assertTrue(maxDoc < 20);
} }
writer.merge(merge); mergeSource.merge(merge);
assertEquals(numDocs, merge.getMergeInfo().info.maxDoc()); assertEquals(numDocs, merge.getMergeInfo().info.maxDoc());
} }
} }

View File

@ -107,7 +107,7 @@ public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
CountDownLatch done = new CountDownLatch(1); CountDownLatch done = new CountDownLatch(1);
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() { ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
@Override @Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
// let merge takes forever, until commit thread is stalled // let merge takes forever, until commit thread is stalled
try { try {
done.await(); done.await();
@ -115,7 +115,7 @@ public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
super.doMerge(writer, merge); super.doMerge(mergeSource, merge);
} }
@Override @Override
@ -125,7 +125,7 @@ public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
} }
@Override @Override
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
} }
}; };

View File

@ -384,7 +384,7 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
if (ms instanceof ConcurrentMergeScheduler) { if (ms instanceof ConcurrentMergeScheduler) {
iwc.setMergeScheduler(new ConcurrentMergeScheduler() { iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override @Override
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
assertTrue(exc instanceof IllegalArgumentException); assertTrue(exc instanceof IllegalArgumentException);
} }
}); });

View File

@ -70,16 +70,16 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
final AtomicBoolean mayMerge = new AtomicBoolean(true); final AtomicBoolean mayMerge = new AtomicBoolean(true);
final MergeScheduler mergeScheduler = new SerialMergeScheduler() { final MergeScheduler mergeScheduler = new SerialMergeScheduler() {
@Override @Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger) throws IOException { synchronized public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
if (mayMerge.get() == false) { if (mayMerge.get() == false) {
MergePolicy.OneMerge merge = writer.getNextMerge(); MergePolicy.OneMerge merge = mergeSource.getNextMerge();
if (merge != null) { if (merge != null) {
System.out.println("TEST: we should not need any merging, yet merge policy returned merge " + merge); System.out.println("TEST: we should not need any merging, yet merge policy returned merge " + merge);
throw new AssertionError(); throw new AssertionError();
} }
} }
super.merge(writer, trigger); super.merge(mergeSource, trigger);
} }
}; };

View File

@ -16,19 +16,17 @@
*/ */
package org.apache.lucene.index; package org.apache.lucene.index;
import org.apache.lucene.store.Directory;
/** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */ /** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */
public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler { public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
@Override @Override
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
while (true) { while (true) {
if (isOK(exc)) { if (isOK(exc)) {
return; return;
} }
exc = exc.getCause(); exc = exc.getCause();
if (exc == null) { if (exc == null) {
super.handleMergeException(dir, exc); super.handleMergeException(exc);
} }
} }
} }

View File

@ -955,7 +955,7 @@ public abstract class LuceneTestCase extends Assert {
} else { } else {
cms = new ConcurrentMergeScheduler() { cms = new ConcurrentMergeScheduler() {
@Override @Override
protected synchronized boolean maybeStall(IndexWriter writer) { protected synchronized boolean maybeStall(MergeSource mergeSource) {
return true; return true;
} }
}; };

View File

@ -192,7 +192,7 @@ public class SolrIndexWriter extends IndexWriter {
// we override this method to collect metrics for merges. // we override this method to collect metrics for merges.
@Override @Override
public void merge(MergePolicy.OneMerge merge) throws IOException { protected void merge(MergePolicy.OneMerge merge) throws IOException {
String segString = merge.segString(); String segString = merge.segString();
long totalNumDocs = merge.totalNumDocs(); long totalNumDocs = merge.totalNumDocs();
runningMerges.put(segString, totalNumDocs); runningMerges.put(segString, totalNumDocs);

View File

@ -20,7 +20,6 @@ package org.apache.solr.core;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.store.Directory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,7 +27,7 @@ public class MockConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override @Override
protected void handleMergeException(Directory dir, Throwable exc) { protected void handleMergeException(Throwable exc) {
// swallow the exception // swallow the exception
log.warn("Merge exception:", exc); log.warn("Merge exception:", exc);
} }