mirror of
synced 2025-03-03 14:59:16 +00:00
Consolidate all IW locking inside IndexWriter (#1454)
Today we still have one class that runs some tricky logic that should be in the IndexWriter in the first place since it requires locking on the IndexWriter itself. This change inverts the API and now FrozendBufferedUpdates does not get the IndexWriter passed in, instead the IndexWriter owns most of the logic and executes on a FrozenBufferedUpdates object. This prevent locking on IndexWriter out side of the writer itself and paves the way to simplify some concurrency down the road
This commit is contained in:
@ -229,7 +229,7 @@ final class BufferedUpdatesStream implements Accountable {
// Frozen packets are now resolved, concurrently, by the indexing threads that
// create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
// but if we get here and the packet is not yet resolved, we resolve it now ourselves:
if (packet.tryApply(writer) == false) {
if (writer.tryApply(packet) == false) {
// if somebody else is currently applying it - move on to the next one and force apply below
@ -237,7 +237,7 @@ final class BufferedUpdatesStream implements Accountable {
for (FrozenBufferedUpdates packet : pendingPackets) {
// now block on all the packets that were concurrently applied to ensure they are due before we continue.
if (infoStream.isEnabled("BD")) {
@ -345,5 +345,4 @@ final class BufferedUpdatesStream implements Accountable {
@ -16,17 +16,12 @@
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
@ -39,7 +34,6 @@ import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
@ -124,284 +118,39 @@ final class FrozenBufferedUpdates {
/** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
* if the private segment was already merged away. */
private List<SegmentCommitInfo> getInfosToApply(IndexWriter writer) {
assert Thread.holdsLock(writer);
final List<SegmentCommitInfo> infos;
if (privateSegment != null) {
if (writer.segmentCommitInfoExist(privateSegment)) {
infos = Collections.singletonList(privateSegment);
}else {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "private segment already gone; skip processing updates");
infos = null;
} else {
infos = writer.listOfSegmentCommitInfos();
return infos;
* Tries to lock this buffered update instance
* @return true if the lock was successfully acquired. otherwise false.
boolean tryLock() {
return applyLock.tryLock();
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* This method will return immediately without blocking if another thread is currently
* applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)}
* must be called.
* */
boolean tryApply(IndexWriter writer) throws IOException {
if (applyLock.tryLock()) {
try {
return true;
} finally {
return false;
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* */
void forceApply(IndexWriter writer) throws IOException {
* locks this buffered update instance
void lock() {
try {
if (applied.getCount() == 0) {
// already done
long startNS = System.nanoTime();
assert any();
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
long iterStartNS = System.nanoTime();
long mergeGenStart = writer.mergeFinishedGen.get();
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
synchronized (writer) {
List<SegmentCommitInfo> infos = getInfosToApply(writer);
if (infos == null) {
for (SegmentCommitInfo info : infos) {
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
segStates = openSegmentStates(writer, infos, seenSegments, delGen());
if (segStates.length == 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
this, segStates.length, mergeGenStart));
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
assert finalizer != null; // access the finalizer to prevent a warning
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
// Since we just resolved some more deletes/updates, now is a good time to write them:
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
if (privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (writer) {
long mergeGenCur = writer.mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
finished = true;
// No merge finished while we were applying, so we are done!
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
if (finished == false) {
// Record that this packet is finished:
if (infoStream.isEnabled("BD")) {
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
if (iter > 0) {
message += "; " + (iter + 1) + " iters due to concurrent merges";
message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
} finally {
/** Opens SegmentReader and inits SegmentState for each segment. */
private static BufferedUpdatesStream.SegmentState[] openSegmentStates(IndexWriter writer, List<SegmentCommitInfo> infos,
Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
segStates.add(new BufferedUpdatesStream.SegmentState(writer.getPooledInstance(info, true), writer::release, info));
} catch (Throwable t) {
try {
} catch (Throwable t1) {
throw t;
return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
* Releases the lock of this buffered update instance
void unlock() {
/** Close segment states previously opened with openSegmentStates. */
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
try {
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (success) {
totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
} finally {
if (writer.infoStream.isEnabled("BD")) {
writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException {
* Returns true iff this buffered updates instance was already applied
boolean isApplied() {
assert applyLock.isHeldByCurrentThread();
synchronized (writer) {
BufferedUpdatesStream.ApplyDeletesResult result;
try {
result = closeSegmentStates(writer, segStates, success);
} finally {
// Matches the incRef we did above, but we must do the decRef after closing segment states else
// IFD can't delete still-open files
if (result.anyDeletes) {
if (result.allDeleted != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "drop 100% deleted segments: " + writer.segString(result.allDeleted));
for (SegmentCommitInfo info : result.allDeleted) {
return applied.getCount() == 0;
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
* the number of new deleted or updated documents. */
private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
assert applyLock.isHeldByCurrentThread();
if (delGen == -1) {
// we were not yet pushed
@ -385,7 +385,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private volatile boolean closed;
private volatile boolean closing;
final AtomicBoolean maybeMerge = new AtomicBoolean();
private final AtomicBoolean maybeMerge = new AtomicBoolean();
private Iterable<Map.Entry<String,String>> commitUserData;
@ -409,9 +409,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final ReaderPool readerPool;
final BufferedUpdatesStream bufferedUpdatesStream;
/** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#forceApply(IndexWriter)}
/** Counts how many merges have completed; this is used by {@link #forceApply(FrozenBufferedUpdates)}
* to handle concurrently apply deletes/updates with merges completing. */
final AtomicLong mergeFinishedGen = new AtomicLong();
private final AtomicLong mergeFinishedGen = new AtomicLong();
// The instance that was passed to the constructor. It is saved only in order
// to allow users to query an IndexWriter settings.
@ -651,13 +651,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
return docWriter.getFlushingBytes();
final long getReaderPoolRamBytesUsed() {
return readerPool.ramBytesUsed();
private final AtomicBoolean writeDocValuesLock = new AtomicBoolean();
void writeSomeDocValuesUpdates() throws IOException {
final void writeSomeDocValuesUpdates() throws IOException {
if (writeDocValuesLock.compareAndSet(false, true)) {
try {
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
@ -665,7 +662,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
long startNS = System.nanoTime();
long ramBytesUsed = getReaderPoolRamBytesUsed();
long ramBytesUsed = readerPool.ramBytesUsed();
if (ramBytesUsed > 0.5 * ramBufferSizeMB * 1024 * 1024) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT, "now write some pending DV updates: %.2f MB used vs IWC Buffer %.2f MB",
@ -707,7 +704,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT, "done write some DV updates for %d segments: now %.2f MB used vs IWC Buffer %.2f MB; took %.2f sec",
count, getReaderPoolRamBytesUsed()/1024./1024., ramBufferSizeMB, ((System.nanoTime() - startNS)/1000000000.)));
count, readerPool.ramBytesUsed()/1024./1024., ramBufferSizeMB, ((System.nanoTime() - startNS)/1000000000.)));
@ -1547,7 +1544,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Drops a segment that has 100% deleted documents. */
synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
private synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
// If a merge has already registered for this
// segment, we leave it in the readerPool; the
// merge will skip merging it and will then drop
@ -1873,7 +1870,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** If enabled, information about merges will be printed to this.
final InfoStream infoStream;
private final InfoStream infoStream;
* Forces merge policy to merge segments until there are
@ -2562,7 +2559,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* the index files referenced exist (correctly) in the
* index directory.
synchronized void checkpoint() throws IOException {
private synchronized void checkpoint() throws IOException {
deleter.checkpoint(segmentInfos, false);
@ -2590,7 +2587,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
try {
// we call tryApply here since we don't want to block if a refresh or a flush is already applying the
// packet. The flush will retry this packet anyway to ensure all of them are applied
} catch (Throwable t) {
try {
w.onTragicEvent(t, "applyUpdatesPacket");
@ -5258,12 +5255,280 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
return readerPool.get(info, create);
void finished(FrozenBufferedUpdates packet) {
// FrozenBufferedUpdates
* Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* This method will return immediately without blocking if another thread is currently
* applying the package. In order to ensure the packet has been applied,
* {@link IndexWriter#forceApply(FrozenBufferedUpdates)} must be called.
boolean tryApply(FrozenBufferedUpdates updates) throws IOException {
if (updates.tryLock()) {
try {
return true;
} finally {
return false;
int getPendingUpdatesCount() {
return bufferedUpdatesStream.getPendingUpdatesCount();
* Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
void forceApply(FrozenBufferedUpdates updates) throws IOException {
try {
if (updates.isApplied()) {
// already done
long startNS = System.nanoTime();
assert updates.any();
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
long iterStartNS = System.nanoTime();
long mergeGenStart = mergeFinishedGen.get();
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
synchronized (this) {
List<SegmentCommitInfo> infos = getInfosToApply(updates);
if (infos == null) {
for (SegmentCommitInfo info : infos) {
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
segStates = openSegmentStates(infos, seenSegments, updates.delGen());
if (segStates.length == 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
this, segStates.length, mergeGenStart));
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(segStates, success.get(), delFiles)) {
assert finalizer != null; // access the finalizer to prevent a warning
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = updates.apply(segStates);
// Since we just resolved some more deletes/updates, now is a good time to write them:
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
if (updates.privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (this) {
long mergeGenCur = mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
finished = true;
// No merge finished while we were applying, so we are done!
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
if (finished == false) {
// Record that this packet is finished:
if (infoStream.isEnabled("BD")) {
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
if (iter > 0) {
message += "; " + (iter + 1) + " iters due to concurrent merges";
message += "; " + bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
} finally {
/** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
* if the private segment was already merged away. */
private synchronized List<SegmentCommitInfo> getInfosToApply(FrozenBufferedUpdates updates) {
final List<SegmentCommitInfo> infos;
if (updates.privateSegment != null) {
if (segmentInfos.contains(updates.privateSegment)) {
infos = Collections.singletonList(updates.privateSegment);
}else {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "private segment already gone; skip processing updates");
infos = null;
} else {
infos = listOfSegmentCommitInfos();
return infos;
private void finishApply(BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException {
synchronized (this) {
BufferedUpdatesStream.ApplyDeletesResult result;
try {
result = closeSegmentStates(segStates, success);
} finally {
// Matches the incRef we did above, but we must do the decRef after closing segment states else
// IFD can't delete still-open files
if (result.anyDeletes) {
if (result.allDeleted != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
for (SegmentCommitInfo info : result.allDeleted) {
/** Close segment states previously opened with openSegmentStates. */
private BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
try {
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (success) {
totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
} finally {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + bufferedUpdatesStream.getPendingUpdatesCount() + " packets; bytesUsed=" + readerPool.ramBytesUsed());
return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
/** Opens SegmentReader and inits SegmentState for each segment. */
private BufferedUpdatesStream.SegmentState[] openSegmentStates(List<SegmentCommitInfo> infos,
Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
segStates.add(new BufferedUpdatesStream.SegmentState(getPooledInstance(info, true), this::release, info));
} catch (Throwable t) {
try {
} catch (Throwable t1) {
throw t;
return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
@ -5279,11 +5544,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
/** Checks if the provided segment exists in the current segmentInfos */
final synchronized boolean segmentCommitInfoExist(SegmentCommitInfo sci) {
return segmentInfos.contains(sci);
/** Returns an unmodifiable view of the list of all segments of the current segmentInfos */
final synchronized List<SegmentCommitInfo> listOfSegmentCommitInfos() {
return segmentInfos.asList();
Reference in New Issue
Block a user