mirror of https://github.com/apache/lucene.git
LUCENE-6119: must check merge for abort even when we are not rate limiting; don't wrap rate limiter when doing addIndexes (it's not abortable); don't leak file handle when wrapping
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1650595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0aac930251
commit
a2614fbd6d
|
@ -394,8 +394,10 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
/**
|
||||
* Returns the number of merge threads that are alive. Note that this number
|
||||
* is ≤ {@link #mergeThreads} size.
|
||||
*
|
||||
* @lucene.internal
|
||||
*/
|
||||
protected synchronized int mergeThreadCount() {
|
||||
public synchronized int mergeThreadCount() {
|
||||
int count = 0;
|
||||
for (MergeThread mergeThread : mergeThreads) {
|
||||
if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
|
||||
|
|
|
@ -2532,7 +2532,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
|
||||
// TODO: somehow we should fix this merge so it's
|
||||
// abortable so that IW.close(false) is able to stop it
|
||||
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory);
|
||||
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
|
||||
|
||||
SegmentInfo info = new SegmentInfo(directory, Version.LATEST, mergedName, -1,
|
||||
false, codec, null, StringHelper.randomId(), new HashMap<>());
|
||||
|
@ -4679,14 +4679,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
|
|||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
ensureOpen();
|
||||
|
||||
// Paranoia defense: if this trips we have a bug somewhere...
|
||||
IndexWriter.this.ensureOpen(false);
|
||||
|
||||
// This Directory is only supposed to be used during merging,
|
||||
// so all writes should have MERGE context, else there is a bug
|
||||
// somewhere that is failing to pass down the right IOContext:
|
||||
assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
|
||||
IndexOutput output = in.createOutput(name, context);
|
||||
|
||||
MergeRateLimiter rateLimiter = rateLimiters.get();
|
||||
assert rateLimiter != null;
|
||||
return new RateLimitedIndexOutput(rateLimiter, output);
|
||||
|
||||
return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -125,6 +125,10 @@ public class MergeRateLimiter extends RateLimiter {
|
|||
|
||||
/** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
|
||||
private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
|
||||
|
||||
// Now is a good time to abort the merge:
|
||||
checkAbort();
|
||||
|
||||
double secondsToPause = (bytes/1024./1024.) / mbPerSec;
|
||||
|
||||
// Time we should sleep until; this is purely instantaneous
|
||||
|
@ -150,9 +154,6 @@ public class MergeRateLimiter extends RateLimiter {
|
|||
int sleepMS = (int) (curPauseNS / 1000000);
|
||||
int sleepNS = (int) (curPauseNS % 1000000);
|
||||
|
||||
// Now is a good time to abort the merge:
|
||||
checkAbort();
|
||||
|
||||
double rate = mbPerSec;
|
||||
|
||||
try {
|
||||
|
|
|
@ -955,10 +955,16 @@ public class TestAddIndexes extends LuceneTestCase {
|
|||
System.out.println("TEST: now force rollback");
|
||||
}
|
||||
c.didClose = true;
|
||||
MergeScheduler ms = c.writer2.getConfig().getMergeScheduler();
|
||||
|
||||
c.writer2.rollback();
|
||||
|
||||
c.joinThreads();
|
||||
|
||||
if (ms instanceof ConcurrentMergeScheduler) {
|
||||
assertEquals(0, ((ConcurrentMergeScheduler) ms).mergeThreadCount());
|
||||
}
|
||||
|
||||
c.closeDir();
|
||||
|
||||
assertTrue(c.failures.size() == 0);
|
||||
|
|
Loading…
Reference in New Issue