LUCENE-8571: Don't block on FrozenBufferedUpdates#apply during IW#processEvents

While indexing we try to apply frozen deletes packages concurrently
on indexing threads if necessary. This is done in an opaque way via
IndexWriter#processEvents. Yet, when we commit or refresh we have to
ensure we apply all frozen update packages before we return.
Today we execute the apply method in a blocking fashion which is unncessary
when we are in a IndexWriter#processEvents context, we block indexing
threads while they could just continue since it's already being applied.
We also might wait in BufferedUpdatesStream when we apply all necessary updates
were we can continue with other work instead of waiting.
This change also tries to apply the packages that are not currently applied
first in order to not unnecessarily block.
This commit is contained in:
Simon Willnauer 2018-11-21 10:22:41 +01:00
parent 08dd681f0f
commit 5f8855ee0b
3 changed files with 160 additions and 122 deletions

View File

@ -19,6 +19,7 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@ -222,14 +223,22 @@ final class BufferedUpdatesStream implements Accountable {
infoStream.message("BD", "waitApply: " + waitFor.size() + " packets: " + waitFor);
}
ArrayList<FrozenBufferedUpdates> pendingPackets = new ArrayList<>();
long totalDelCount = 0;
for (FrozenBufferedUpdates packet : waitFor) {
// Frozen packets are now resolved, concurrently, by the indexing threads that
// create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
// but if we get here and the packet is not yet resolved, we resolve it now ourselves:
packet.apply(writer);
if (packet.tryApply(writer) == false) {
// if somebody else is currently applying it - move on to the next one and force apply below
pendingPackets.add(packet);
}
totalDelCount += packet.totalDelCount;
}
for (FrozenBufferedUpdates packet : pendingPackets) {
// now block on all the packets that were concurrently applied to ensure they are due before we continue.
packet.forceApply(writer);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
@ -83,6 +84,7 @@ final class FrozenBufferedUpdates {
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1);
private final ReentrantLock applyLock = new ReentrantLock();
/** How many total documents were deleted/updated. */
public long totalDelCount;
@ -214,149 +216,173 @@ final class FrozenBufferedUpdates {
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads. */
* operation and is done concurrently by incoming indexing threads.
* This method will return immediately without blocking if another thread is currently
* applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)}
* must be called.
* */
@SuppressWarnings("try")
public synchronized void apply(IndexWriter writer) throws IOException {
if (applied.getCount() == 0) {
// already done
return;
}
long startNS = System.nanoTime();
assert any();
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
boolean tryApply(IndexWriter writer) throws IOException {
if (applyLock.tryLock()) {
try {
forceApply(writer);
return true;
} finally {
applyLock.unlock();
}
}
return false;
}
long iterStartNS = System.nanoTime();
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* */
void forceApply(IndexWriter writer) throws IOException {
applyLock.lock();
try {
if (applied.getCount() == 0) {
// already done
return;
}
long startNS = System.nanoTime();
long mergeGenStart = writer.mergeFinishedGen.get();
assert any();
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
synchronized (writer) {
List<SegmentCommitInfo> infos = getInfosToApply(writer);
if (infos == null) {
break;
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
}
for (SegmentCommitInfo info : infos) {
delFiles.addAll(info.files());
}
long iterStartNS = System.nanoTime();
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
segStates = openSegmentStates(writer, infos, seenSegments, delGen());
long mergeGenStart = writer.mergeFinishedGen.get();
if (segStates.length == 0) {
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
synchronized (writer) {
List<SegmentCommitInfo> infos = getInfosToApply(writer);
if (infos == null) {
break;
}
for (SegmentCommitInfo info : infos) {
delFiles.addAll(info.files());
}
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
segStates = openSegmentStates(writer, infos, seenSegments, delGen());
if (segStates.length == 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
}
break;
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
this, segStates.length, mergeGenStart));
}
break;
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
writer.deleter.incRef(delFiles);
}
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);
}
// Since we just resolved some more deletes/updates, now is a good time to write them:
writer.writeSomeDocValuesUpdates();
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
this, segStates.length, mergeGenStart));
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
}
if (privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
writer.deleter.incRef(delFiles);
}
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);
}
// Since we just resolved some more deletes/updates, now is a good time to write them:
writer.writeSomeDocValuesUpdates();
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
}
if (privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
break;
}
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (writer) {
long mergeGenCur = writer.mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
writer.finished(this);
finished = true;
// No merge finished while we were applying, so we are done!
break;
}
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (writer) {
long mergeGenCur = writer.mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
writer.finished(this);
finished = true;
// No merge finished while we were applying, so we are done!
break;
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
}
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
iter++;
}
if (finished == false) {
// Record that this packet is finished:
writer.finished(this);
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
if (iter > 0) {
message += "; " + (iter + 1) + " iters due to concurrent merges";
}
message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
}
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
iter++;
}
if (finished == false) {
// Record that this packet is finished:
writer.finished(this);
}
if (infoStream.isEnabled("BD")) {
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
if (iter > 0) {
message += "; " + (iter+1) + " iters due to concurrent merges";
}
message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
} finally {
applyLock.unlock();
}
}
@ -411,6 +437,7 @@ final class FrozenBufferedUpdates {
private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException {
assert applyLock.isHeldByCurrentThread();
synchronized (writer) {
BufferedUpdatesStream.ApplyDeletesResult result;
@ -441,8 +468,8 @@ final class FrozenBufferedUpdates {
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
* the number of new deleted or updated documents. */
private synchronized long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
assert applyLock.isHeldByCurrentThread();
if (delGen == -1) {
// we were not yet pushed
throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");

View File

@ -2607,7 +2607,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
eventQueue.add(w -> {
try {
packet.apply(w);
// we call tryApply here since we don't want to block if a refresh or a flush is already applying the
// packet. The flush will retry this packet anyway to ensure all of them are applied
packet.tryApply(w);
} catch (Throwable t) {
try {
w.onTragicEvent(t, "applyUpdatesPacket");