LUCENE-9473: Ensure merges are stopped during abort merges (#1772)

We need to disable merges while we wait for running merges since
IW calls timed wait on it's lock that releases the monitor for the time
being which allows new merges to be registered unless we disable them.
This commit is contained in:
Simon Willnauer 2020-08-24 09:15:42 +02:00 committed by GitHub
parent e1392c7440
commit 8480329213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 13 deletions

View File

@ -401,7 +401,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final Set<MergePolicy.OneMerge> runningMerges = new HashSet<>(); private final Set<MergePolicy.OneMerge> runningMerges = new HashSet<>();
private final List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>(); private final List<MergePolicy.OneMerge> mergeExceptions = new ArrayList<>();
private long mergeGen; private long mergeGen;
private boolean stopMerges; // TODO make sure this is only changed once and never set back to false private Merges merges = new Merges();
private boolean didMessageState; private boolean didMessageState;
private final AtomicInteger flushCount = new AtomicInteger(); private final AtomicInteger flushCount = new AtomicInteger();
private final AtomicInteger flushDeletesCount = new AtomicInteger(); private final AtomicInteger flushDeletesCount = new AtomicInteger();
@ -2144,7 +2144,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0; assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
assert trigger != null; assert trigger != null;
if (stopMerges) { if (merges.areEnabled() == false) {
return null; return null;
} }
@ -2261,10 +2261,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
try { try {
synchronized (this) { synchronized (this) {
// must be synced otherwise register merge might throw and exception if stopMerges // must be synced otherwise register merge might throw and exception if merges
// changes concurrently, abortMerges is synced as well // changes concurrently, abortMerges is synced as well
stopMerges = true; // this disables merges forever abortMerges(); // this disables merges forever since we are closing and can't reenable them
abortMerges();
assert mergingSegments.isEmpty() : "we aborted all merges but still have merging segments: " + mergingSegments; assert mergingSegments.isEmpty() : "we aborted all merges but still have merging segments: " + mergingSegments;
} }
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -2427,7 +2426,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
synchronized (this) { synchronized (this) {
try { try {
// Abort any running merges // Abort any running merges
try {
abortMerges(); abortMerges();
assert merges.areEnabled() == false : "merges should be disabled - who enabled them?";
assert mergingSegments.isEmpty() : "found merging segments but merges are disabled: " + mergingSegments;
} finally {
// abortMerges disables all merges and we need to re-enable them here to make sure
// IW can function properly. An exception in abortMerges() might be fatal for IW but just to be sure
// lets re-enable merges anyway.
merges.enable();
}
adjustPendingNumDocs(-segmentInfos.totalMaxDoc()); adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
// Remove all segments // Remove all segments
segmentInfos.clear(); segmentInfos.clear();
@ -2451,6 +2459,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
return seqNo; return seqNo;
} finally { } finally {
if (success == false) { if (success == false) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during deleteAll"); infoStream.message("IW", "hit exception during deleteAll");
} }
@ -2469,6 +2478,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* method: when you abort a long-running merge, you lose * method: when you abort a long-running merge, you lose
* a lot of work that must later be redone. */ * a lot of work that must later be redone. */
private synchronized void abortMerges() throws IOException { private synchronized void abortMerges() throws IOException {
merges.disable();
// Abort all pending & running merges: // Abort all pending & running merges:
IOUtils.applyToAll(pendingMerges, merge -> { IOUtils.applyToAll(pendingMerges, merge -> {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -2969,7 +2979,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
synchronized (this) { synchronized (this) {
ensureOpen(); ensureOpen();
assert stopMerges == false; assert merges.areEnabled();
runningAddIndexesMerges.add(merger); runningAddIndexesMerges.add(merger);
} }
try { try {
@ -2990,7 +3000,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
final MergePolicy mergePolicy = config.getMergePolicy(); final MergePolicy mergePolicy = config.getMergePolicy();
boolean useCompoundFile; boolean useCompoundFile;
synchronized(this) { // Guard segmentInfos synchronized(this) { // Guard segmentInfos
if (stopMerges) { if (merges.areEnabled() == false) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
@ -3026,7 +3036,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Register the new segment // Register the new segment
synchronized(this) { synchronized(this) {
if (stopMerges) { if (merges.areEnabled() == false) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
@ -4232,7 +4242,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} }
assert merge.segments.size() > 0; assert merge.segments.size() > 0;
if (stopMerges) { if (merges.areEnabled() == false) {
abortOneMerge(merge); abortOneMerge(merge);
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments)); throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
} }
@ -5728,4 +5738,24 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
return writer.segString(); return writer.segString();
} }
} }
private class Merges {
private boolean mergesEnabled = true;
boolean areEnabled() {
assert Thread.holdsLock(IndexWriter.this);
return mergesEnabled;
}
void disable() {
assert Thread.holdsLock(IndexWriter.this);
mergesEnabled = false;
}
void enable() {
ensureOpen();
assert Thread.holdsLock(IndexWriter.this);
mergesEnabled = true;
}
}
} }

View File

@ -303,7 +303,8 @@ public class TestIndexWriterDelete extends LuceneTestCase {
public void testDeleteAllNoDeadLock() throws IOException, InterruptedException { public void testDeleteAllNoDeadLock() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir); final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir,
newIndexWriterConfig().setMergePolicy(new MockRandomMergePolicy(random())));
int numThreads = atLeast(2); int numThreads = atLeast(2);
Thread[] threads = new Thread[numThreads]; Thread[] threads = new Thread[numThreads];
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
@ -341,7 +342,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
threads[i].start(); threads[i].start();
} }
latch.countDown(); latch.countDown();
while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) { while (!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
if (VERBOSE) { if (VERBOSE) {
System.out.println("\nTEST: now deleteAll"); System.out.println("\nTEST: now deleteAll");
} }