LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)

This change adds infrastructure to allow straight forward waiting
on one or more merges or an entire merge specification. This is
a basis for LUCENE-8962.
This commit is contained in:
Simon Willnauer 2020-06-17 22:48:12 +02:00 committed by GitHub
parent 207efbceeb
commit 59efe22ac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 233 additions and 24 deletions

View File

@ -2129,12 +2129,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
mergeScheduler.merge(mergeSource, trigger);
}
}
private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {
// In case infoStream was disabled on init, but then enabled at some
@ -2144,22 +2144,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
assert trigger != null;
if (stopMerges) {
return false;
return null;
}
// Do not start new merges if disaster struck
if (tragedy.get() != null) {
return false;
return null;
}
boolean newMergesFound = false;
final MergePolicy.MergeSpecification spec;
if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
"Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
@ -2169,14 +2168,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
registerMerge(spec.merges.get(i));
}
}
return newMergesFound;
return spec;
}
/** Expert: to be used by a {@link MergePolicy} to avoid
@ -4289,7 +4287,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final boolean drop = suppressExceptions == false;
try (Closeable finalizer = merge::mergeFinished) {
try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
IOUtils.applyToAll(merge.readers, sr -> {
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:

View File

@ -23,7 +23,12 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@ -37,6 +42,7 @@ import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* <p>Expert: a MergePolicy determines the sequence of
@ -76,7 +82,7 @@ public abstract class MergePolicy {
* @lucene.experimental */
public static class OneMergeProgress {
/** Reason for pausing the merge thread. */
public static enum PauseReason {
public enum PauseReason {
/** Stopped (because of throughput rate set to 0, typically). */
STOPPED,
/** Temporarily paused because of exceeded throughput rate. */
@ -196,6 +202,7 @@ public abstract class MergePolicy {
*
* @lucene.experimental */
public static class OneMerge {
private final CompletableFuture<Boolean> mergeCompleted = new CompletableFuture<>();
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
@ -222,7 +229,7 @@ public abstract class MergePolicy {
volatile long mergeStartNS = -1;
/** Total number of documents in segments to be merged, not accounting for deletions. */
public final int totalMaxDoc;
final int totalMaxDoc;
Throwable error;
/** Sole constructor.
@ -233,13 +240,8 @@ public abstract class MergePolicy {
throw new RuntimeException("segments must include at least one segment");
}
// clone the list, as the in list may be based off original SegmentInfos and may be modified
this.segments = new ArrayList<>(segments);
int count = 0;
for(SegmentCommitInfo info : segments) {
count += info.info.maxDoc();
}
totalMaxDoc = count;
this.segments = List.copyOf(segments);
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
}
@ -251,8 +253,12 @@ public abstract class MergePolicy {
mergeProgress.setMergeThread(Thread.currentThread());
}
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
public void mergeFinished() throws IOException {
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
* @param success true iff the merge finished successfully ie. was committed */
public void mergeFinished(boolean success) throws IOException {
if (mergeCompleted.complete(success) == false) {
throw new IllegalStateException("merge has already finished");
}
}
/** Wrap the reader in order to add/remove information to the merged segment. */
@ -362,6 +368,37 @@ public abstract class MergePolicy {
public OneMergeProgress getMergeProgress() {
return mergeProgress;
}
/**
* Waits for this merge to be completed
* @return true if the merge finished within the specified timeout
*/
boolean await(long timeout, TimeUnit timeUnit) {
try {
mergeCompleted.get(timeout, timeUnit);
return true;
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException | TimeoutException e) {
return false;
}
}
/**
* Returns true if the merge has finished or false if it's still running or
* has not been started. This method will not block.
*/
boolean isDone() {
return mergeCompleted.isDone();
}
/**
* Returns true iff the merge completed successfully or false if the merge succeeded with a failure.
* This method will not block and return an empty Optional if the merge has not finished yet
*/
Optional<Boolean> hasCompletedSuccessfully() {
return Optional.ofNullable(mergeCompleted.getNow(null));
}
}
/**
@ -399,6 +436,22 @@ public abstract class MergePolicy {
}
return b.toString();
}
/**
* Waits if necessary for at most the given time for all merges.
*/
boolean await(long timeout, TimeUnit unit) {
try {
CompletableFuture<Void> future = CompletableFuture.allOf(merges.stream()
.map(m -> m.mergeCompleted).collect(Collectors.toList()).toArray(CompletableFuture<?>[]::new));
future.get(timeout, unit);
return true;
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException | TimeoutException e) {
return false;
}
}
}
/** Exception thrown if there are any problems while executing a merge. */

View File

@ -538,7 +538,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
}
@Override
public void mergeFinished() throws IOException {
public void mergeFinished(boolean success) throws IOException {
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {

View File

@ -4181,7 +4181,7 @@ public class TestIndexWriter extends LuceneTestCase {
SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
return new MergePolicy.OneMerge(merge.segments) {
@Override
public void mergeFinished() {
public void mergeFinished(boolean success) {
onlyFinishOnce.set(true);
}
};

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
public class TestMergePolicy extends LuceneTestCase {
public void testWaitForOneMerge() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 1 + random().nextInt(10));
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
Thread t = new Thread(() -> {
try {
for (MergePolicy.OneMerge m : ms.merges) {
m.mergeFinished(true);
}
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
assertTrue(ms.await(100, TimeUnit.HOURS));
for (MergePolicy.OneMerge m : ms.merges) {
assertTrue(m.hasCompletedSuccessfully().get());
}
t.join();
}
}
public void testTimeout() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 3);
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
Thread t = new Thread(() -> {
try {
ms.merges.get(0).mergeFinished(true);
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
assertFalse(ms.merges.get(1).hasCompletedSuccessfully().isPresent());
t.join();
}
}
public void testTimeoutLargeNumberOfMerges() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 10000);
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
AtomicInteger i = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
Thread t = new Thread(() -> {
while (stop.get() == false) {
try {
ms.merges.get(i.getAndIncrement()).mergeFinished(true);
Thread.sleep(1);
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
}
}
});
t.start();
assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
stop.set(true);
t.join();
for (int j = 0; j < ms.merges.size(); j++) {
if (j < i.get()) {
assertTrue(ms.merges.get(j).hasCompletedSuccessfully().get());
} else {
assertFalse(ms.merges.get(j).hasCompletedSuccessfully().isPresent());
}
}
}
}
public void testFinishTwice() throws IOException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
oneMerge.mergeFinished(true);
expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
}
}
public void testTotalMaxDoc() throws IOException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
int docs = 0;
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
for (SegmentCommitInfo info : oneMerge.segments) {
docs += info.info.maxDoc();
}
assertEquals(docs, oneMerge.totalMaxDoc);
}
}
private static MergePolicy.MergeSpecification createRandomMergeSpecification(Directory dir, int numMerges) {
MergePolicy.MergeSpecification ms = new MergePolicy.MergeSpecification();
for (int ii = 0; ii < numMerges; ++ii) {
final SegmentInfo si = new SegmentInfo(
dir, // dir
Version.LATEST, // version
Version.LATEST, // min version
TestUtil.randomSimpleString(random()), // name
random().nextInt(1000), // maxDoc
random().nextBoolean(), // isCompoundFile
null, // codec
Collections.emptyMap(), // diagnostics
TestUtil.randomSimpleString(// id
random(),
StringHelper.ID_LENGTH,
StringHelper.ID_LENGTH).getBytes(StandardCharsets.US_ASCII),
Collections.emptyMap(), // attributes
null /* indexSort */);
final List<SegmentCommitInfo> segments = new LinkedList<SegmentCommitInfo>();
segments.add(new SegmentCommitInfo(si, 0, 0, 0, 0, 0, StringHelper.randomId()));
ms.add(new MergePolicy.OneMerge(segments));
}
return ms;
}
}