LUCENE-8269: Detach downstream classes from IndexWriter

IndexWriter today is shared with many classes like BufferedUpdateStream,
DocumentsWriter and DocumentsWriterPerThread. Some of them even acquire locks
on the writer instance or assert that the current thread doesn't hold a lock.
This makes it very difficult to have a manageable threading model.

This change separates out the IndexWriter from those classes and makes them all
independent of IW. IW now implements a new interface for DocumentsWriter to communicate
on failed or successful flushes and tragic events. This allows IW to make it's critical
methods private and execute all lock critical actions on it's private queue that ensures
that the IW lock is not held. Follow-up changes will try to detach more code like
publishing flushed segments to ensure we never call back into IW in an uncontrolled way.
This commit is contained in:
Simon Willnauer 2018-04-23 17:17:40 +02:00 committed by GitHub
parent e8c36f489e
commit 6f0a884582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 315 additions and 289 deletions

View File

@ -17,9 +17,8 @@
package org.apache.lucene.index; package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -48,7 +47,7 @@ import org.apache.lucene.util.InfoStream;
* track which BufferedDeletes packets to apply to any given * track which BufferedDeletes packets to apply to any given
* segment. */ * segment. */
class BufferedUpdatesStream implements Accountable { final class BufferedUpdatesStream implements Accountable {
private final Set<FrozenBufferedUpdates> updates = new HashSet<>(); private final Set<FrozenBufferedUpdates> updates = new HashSet<>();
@ -56,22 +55,19 @@ class BufferedUpdatesStream implements Accountable {
// deletes applied (whose bufferedDelGen defaults to 0) // deletes applied (whose bufferedDelGen defaults to 0)
// will be correct: // will be correct:
private long nextGen = 1; private long nextGen = 1;
private final FinishedSegments finishedSegments; private final FinishedSegments finishedSegments;
private final InfoStream infoStream; private final InfoStream infoStream;
private final AtomicLong bytesUsed = new AtomicLong(); private final AtomicLong bytesUsed = new AtomicLong();
private final AtomicInteger numTerms = new AtomicInteger(); private final AtomicInteger numTerms = new AtomicInteger();
private final IndexWriter writer;
public BufferedUpdatesStream(IndexWriter writer) { BufferedUpdatesStream(InfoStream infoStream) {
this.writer = writer; this.infoStream = infoStream;
this.infoStream = writer.infoStream;
this.finishedSegments = new FinishedSegments(infoStream); this.finishedSegments = new FinishedSegments(infoStream);
} }
// Appends a new packet of buffered deletes to the stream, // Appends a new packet of buffered deletes to the stream,
// setting its generation: // setting its generation:
public synchronized long push(FrozenBufferedUpdates packet) { synchronized long push(FrozenBufferedUpdates packet) {
/* /*
* The insert operation must be atomic. If we let threads increment the gen * The insert operation must be atomic. If we let threads increment the gen
* and push the packet afterwards we risk that packets are out of order. * and push the packet afterwards we risk that packets are out of order.
@ -94,12 +90,12 @@ class BufferedUpdatesStream implements Accountable {
return packet.delGen(); return packet.delGen();
} }
public synchronized int getPendingUpdatesCount() { synchronized int getPendingUpdatesCount() {
return updates.size(); return updates.size();
} }
/** Only used by IW.rollback */ /** Only used by IW.rollback */
public synchronized void clear() { synchronized void clear() {
updates.clear(); updates.clear();
nextGen = 1; nextGen = 1;
finishedSegments.clear(); finishedSegments.clear();
@ -107,11 +103,11 @@ class BufferedUpdatesStream implements Accountable {
bytesUsed.set(0); bytesUsed.set(0);
} }
public boolean any() { boolean any() {
return bytesUsed.get() != 0; return bytesUsed.get() != 0;
} }
public int numTerms() { int numTerms() {
return numTerms.get(); return numTerms.get();
} }
@ -120,13 +116,13 @@ class BufferedUpdatesStream implements Accountable {
return bytesUsed.get(); return bytesUsed.get();
} }
public static class ApplyDeletesResult { static class ApplyDeletesResult {
// True if any actual deletes took place: // True if any actual deletes took place:
public final boolean anyDeletes; final boolean anyDeletes;
// If non-null, contains segments that are 100% deleted // If non-null, contains segments that are 100% deleted
public final List<SegmentCommitInfo> allDeleted; final List<SegmentCommitInfo> allDeleted;
ApplyDeletesResult(boolean anyDeletes, List<SegmentCommitInfo> allDeleted) { ApplyDeletesResult(boolean anyDeletes, List<SegmentCommitInfo> allDeleted) {
this.anyDeletes = anyDeletes; this.anyDeletes = anyDeletes;
@ -137,26 +133,22 @@ class BufferedUpdatesStream implements Accountable {
/** Waits for all in-flight packets, which are already being resolved concurrently /** Waits for all in-flight packets, which are already being resolved concurrently
* by indexing threads, to finish. Returns true if there were any * by indexing threads, to finish. Returns true if there were any
* new deletes or updates. This is called for refresh, commit. */ * new deletes or updates. This is called for refresh, commit. */
public void waitApplyAll() throws IOException { void waitApplyAll(IndexWriter writer) throws IOException {
assert Thread.holdsLock(writer) == false; assert Thread.holdsLock(writer) == false;
final long t0 = System.nanoTime();
Set<FrozenBufferedUpdates> waitFor; Set<FrozenBufferedUpdates> waitFor;
synchronized (this) { synchronized (this) {
waitFor = new HashSet<>(updates); waitFor = new HashSet<>(updates);
} }
waitApply(waitFor); waitApply(waitFor, writer);
} }
/** Returns true if this delGen is still running. */ /** Returns true if this delGen is still running. */
public boolean stillRunning(long delGen) { boolean stillRunning(long delGen) {
return finishedSegments.stillRunning(delGen); return finishedSegments.stillRunning(delGen);
} }
public void finishedSegment(long delGen) { void finishedSegment(long delGen) {
finishedSegments.finishedSegment(delGen); finishedSegments.finishedSegment(delGen);
} }
@ -164,7 +156,7 @@ class BufferedUpdatesStream implements Accountable {
* delGen. We track the completed delGens and record the maximum delGen for which all prior * delGen. We track the completed delGens and record the maximum delGen for which all prior
* delGens, inclusive, are completed, so that it's safe for doc values updates to apply and write. */ * delGens, inclusive, are completed, so that it's safe for doc values updates to apply and write. */
public synchronized void finished(FrozenBufferedUpdates packet) { synchronized void finished(FrozenBufferedUpdates packet) {
// TODO: would be a bit more memory efficient to track this per-segment, so when each segment writes it writes all packets finished for // TODO: would be a bit more memory efficient to track this per-segment, so when each segment writes it writes all packets finished for
// it, rather than only recording here, across all segments. But, more complex code, and more CPU, and maybe not so much impact in // it, rather than only recording here, across all segments. But, more complex code, and more CPU, and maybe not so much impact in
// practice? // practice?
@ -182,18 +174,14 @@ class BufferedUpdatesStream implements Accountable {
} }
/** All frozen packets up to and including this del gen are guaranteed to be finished. */ /** All frozen packets up to and including this del gen are guaranteed to be finished. */
public long getCompletedDelGen() { long getCompletedDelGen() {
return finishedSegments.getCompletedDelGen(); return finishedSegments.getCompletedDelGen();
} }
/** Waits only for those in-flight packets that apply to these merge segments. This is /** Waits only for those in-flight packets that apply to these merge segments. This is
* called when a merge needs to finish and must ensure all deletes to the merging * called when a merge needs to finish and must ensure all deletes to the merging
* segments are resolved. */ * segments are resolved. */
public void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos) throws IOException { void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos, IndexWriter writer) throws IOException {
assert Thread.holdsLock(writer) == false;
final long t0 = System.nanoTime();
long maxDelGen = Long.MIN_VALUE; long maxDelGen = Long.MIN_VALUE;
for (SegmentCommitInfo info : mergeInfos) { for (SegmentCommitInfo info : mergeInfos) {
maxDelGen = Math.max(maxDelGen, info.getBufferedDeletesGen()); maxDelGen = Math.max(maxDelGen, info.getBufferedDeletesGen());
@ -214,10 +202,10 @@ class BufferedUpdatesStream implements Accountable {
infoStream.message("BD", "waitApplyForMerge: " + waitFor.size() + " packets, " + mergeInfos.size() + " merging segments"); infoStream.message("BD", "waitApplyForMerge: " + waitFor.size() + " packets, " + mergeInfos.size() + " merging segments");
} }
waitApply(waitFor); waitApply(waitFor, writer);
} }
private void waitApply(Set<FrozenBufferedUpdates> waitFor) throws IOException { private void waitApply(Set<FrozenBufferedUpdates> waitFor, IndexWriter writer) throws IOException {
long startNS = System.nanoTime(); long startNS = System.nanoTime();
@ -258,87 +246,34 @@ class BufferedUpdatesStream implements Accountable {
} }
/** Holds all per-segment internal state used while resolving deletions. */ /** Holds all per-segment internal state used while resolving deletions. */
static final class SegmentState { static final class SegmentState implements Closeable {
final long delGen; final long delGen;
final ReadersAndUpdates rld; final ReadersAndUpdates rld;
final SegmentReader reader; final SegmentReader reader;
final int startDelCount; final int startDelCount;
private final IOUtils.IOConsumer<ReadersAndUpdates> onClose;
TermsEnum termsEnum; TermsEnum termsEnum;
PostingsEnum postingsEnum; PostingsEnum postingsEnum;
BytesRef term; BytesRef term;
SegmentState(ReadersAndUpdates rld, SegmentCommitInfo info) throws IOException { SegmentState(ReadersAndUpdates rld, IOUtils.IOConsumer<ReadersAndUpdates> onClose, SegmentCommitInfo info) throws IOException {
this.rld = rld; this.rld = rld;
startDelCount = rld.getPendingDeleteCount(); startDelCount = rld.getPendingDeleteCount();
reader = rld.getReader(IOContext.READ);
delGen = info.getBufferedDeletesGen(); delGen = info.getBufferedDeletesGen();
this.onClose = onClose;
reader = rld.getReader(IOContext.READ);
} }
@Override @Override
public String toString() { public String toString() {
return "SegmentState(" + rld.info + ")"; return "SegmentState(" + rld.info + ")";
} }
}
/** Opens SegmentReader and inits SegmentState for each segment. */ @Override
public SegmentState[] openSegmentStates(List<SegmentCommitInfo> infos, public void close() throws IOException {
Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException { IOUtils.close(() -> rld.release(reader), () -> onClose.accept(rld));
List<SegmentState> segStates = new ArrayList<>();
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
segStates.add(new SegmentState(writer.getPooledInstance(info, true), info));
alreadySeenSegments.add(info);
}
}
} catch (Throwable t) {
try {
finishSegmentStates(segStates);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
} }
return segStates.toArray(new SegmentState[0]);
}
private void finishSegmentStates(List<SegmentState> segStates) throws IOException {
IOUtils.applyToAll(segStates, s -> {
ReadersAndUpdates rld = s.rld;
try {
rld.release(s.reader);
} finally {
writer.release(s.rld);
}
});
}
/** Close segment states previously opened with openSegmentStates. */
public ApplyDeletesResult closeSegmentStates(SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
final List<SegmentState> segmentStates = Arrays.asList(segStates);
for (SegmentState segState : segmentStates) {
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getSegmentInfo());
}
}
}
finishSegmentStates(segmentStates);
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + updates.size() + " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
}
return new ApplyDeletesResult(totDelCount > 0, allDeleted);
} }
// only for assert // only for assert
@ -368,24 +303,24 @@ class BufferedUpdatesStream implements Accountable {
private final InfoStream infoStream; private final InfoStream infoStream;
public FinishedSegments(InfoStream infoStream) { FinishedSegments(InfoStream infoStream) {
this.infoStream = infoStream; this.infoStream = infoStream;
} }
public synchronized void clear() { synchronized void clear() {
finishedDelGens.clear(); finishedDelGens.clear();
completedDelGen = 0; completedDelGen = 0;
} }
public synchronized boolean stillRunning(long delGen) { synchronized boolean stillRunning(long delGen) {
return delGen > completedDelGen && finishedDelGens.contains(delGen) == false; return delGen > completedDelGen && finishedDelGens.contains(delGen) == false;
} }
public synchronized long getCompletedDelGen() { synchronized long getCompletedDelGen() {
return completedDelGen; return completedDelGen;
} }
public synchronized void finishedSegment(long delGen) { synchronized void finishedSegment(long delGen) {
finishedDelGens.add(delGen); finishedDelGens.add(delGen);
while (true) { while (true) {
if (finishedDelGens.contains(completedDelGen + 1)) { if (finishedDelGens.contains(completedDelGen + 1)) {

View File

@ -23,17 +23,17 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Queue; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.ToLongFunction; import java.util.function.ToLongFunction;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket; import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.index.IndexWriter.Event;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -101,6 +101,12 @@ import org.apache.lucene.util.InfoStream;
final class DocumentsWriter implements Closeable, Accountable { final class DocumentsWriter implements Closeable, Accountable {
private final Directory directoryOrig; // no wrapping, for infos private final Directory directoryOrig; // no wrapping, for infos
private final Directory directory; private final Directory directory;
private final FieldInfos.FieldNumbers globalFieldNumberMap;
private final int indexCreatedVersionMajor;
private final AtomicLong pendingNumDocs;
private final boolean enableTestPoints;
private final Supplier<String> segmentNameSupplier;
private final FlushNotifications flushNotifications;
private volatile boolean closed; private volatile boolean closed;
@ -124,11 +130,12 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThreadPool perThreadPool; final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy; final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl; final DocumentsWriterFlushControl flushControl;
private final IndexWriter writer;
private final Queue<Event> events;
private long lastSeqNo; private long lastSeqNo;
DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) { DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory,
FieldInfos.FieldNumbers globalFieldNumberMap) {
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.directoryOrig = directoryOrig; this.directoryOrig = directoryOrig;
this.directory = directory; this.directory = directory;
this.config = config; this.config = config;
@ -136,9 +143,12 @@ final class DocumentsWriter implements Closeable, Accountable {
this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream); this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool = config.getIndexerThreadPool(); this.perThreadPool = config.getIndexerThreadPool();
flushPolicy = config.getFlushPolicy(); flushPolicy = config.getFlushPolicy();
this.writer = writer; this.globalFieldNumberMap = globalFieldNumberMap;
this.events = new ConcurrentLinkedQueue<>(); this.pendingNumDocs = pendingNumDocs;
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream); flushControl = new DocumentsWriterFlushControl(this, config);
this.segmentNameSupplier = segmentNameSupplier;
this.enableTestPoints = enableTestPoints;
this.flushNotifications = flushNotifications;
} }
long deleteQueries(final Query... queries) throws IOException { long deleteQueries(final Query... queries) throws IOException {
@ -175,7 +185,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (deleteQueue != null) { if (deleteQueue != null) {
ticketQueue.addDeletes(deleteQueue); ticketQueue.addDeletes(deleteQueue);
} }
putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge flushNotifications.onDeletesApplied(); // apply deletes event forces a purge
return true; return true;
} }
return false; return false;
@ -409,10 +419,10 @@ final class DocumentsWriter implements Closeable, Accountable {
private void ensureInitialized(ThreadState state) throws IOException { private void ensureInitialized(ThreadState state) throws IOException {
if (state.dwpt == null) { if (state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(writer.globalFieldNumberMap); final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(writer, writer.newSegmentName(), directoryOrig, state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
directory, config, infoStream, deleteQueue, infos, directory, config, infoStream, deleteQueue, infos,
writer.pendingNumDocs, writer.enableTestPoints); pendingNumDocs, enableTestPoints);
} }
} }
@ -433,7 +443,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqNo = dwpt.updateDocuments(docs, analyzer, delNode); seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications);
} finally { } finally {
if (dwpt.isAborted()) { if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
@ -460,7 +470,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException { final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
@ -477,7 +487,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
seqNo = dwpt.updateDocument(doc, analyzer, delNode); seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
} finally { } finally {
if (dwpt.isAborted()) { if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
@ -536,17 +546,18 @@ final class DocumentsWriter implements Closeable, Accountable {
boolean dwptSuccess = false; boolean dwptSuccess = false;
try { try {
// flush concurrently without locking // flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush(); final FlushedSegment newSegment = flushingDWPT.flush(flushNotifications);
ticketQueue.addSegment(ticket, newSegment); ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true; dwptSuccess = true;
} finally { } finally {
subtractFlushedNumDocs(flushingDocsInRam); subtractFlushedNumDocs(flushingDocsInRam);
if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) { if (flushingDWPT.pendingFilesToDelete().isEmpty() == false) {
putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete())); Set<String> files = flushingDWPT.pendingFilesToDelete();
flushNotifications.deleteUnusedFiles(files);
hasEvents = true; hasEvents = true;
} }
if (dwptSuccess == false) { if (dwptSuccess == false) {
putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo())); flushNotifications.flushFailed(flushingDWPT.getSegmentInfo());
hasEvents = true; hasEvents = true;
} }
} }
@ -569,7 +580,7 @@ final class DocumentsWriter implements Closeable, Accountable {
// thread in innerPurge can't keep up with all // thread in innerPurge can't keep up with all
// other threads flushing segments. In this case // other threads flushing segments. In this case
// we forcefully stall the producers. // we forcefully stall the producers.
putEvent(ForcedPurgeEvent.INSTANCE); flushNotifications.onTicketBacklog();
break; break;
} }
} finally { } finally {
@ -580,7 +591,7 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
if (hasEvents) { if (hasEvents) {
writer.doAfterSegmentFlushed(false, false); flushNotifications.afterSegmentsFlushed();
} }
// If deletes alone are consuming > 1/2 our RAM // If deletes alone are consuming > 1/2 our RAM
@ -597,12 +608,52 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl.getDeleteBytesUsed()/(1024.*1024.), flushControl.getDeleteBytesUsed()/(1024.*1024.),
ramBufferSizeMB)); ramBufferSizeMB));
} }
putEvent(ApplyDeletesEvent.INSTANCE); flushNotifications.onDeletesApplied();
} }
} }
return hasEvents; return hasEvents;
} }
interface FlushNotifications { // TODO maybe we find a better name for this?
/**
* Called when files were written to disk that are not used anymore. It's the implementations responsibilty
* to clean these files up
*/
void deleteUnusedFiles(Collection<String> files);
/**
* Called when a segment failed to flush.
*/
void flushFailed(SegmentInfo info);
/**
* Called after one or more segments were flushed to disk.
*/
void afterSegmentsFlushed() throws IOException;
/**
* Should be called if a flush or an indexing operation caused a tragic / unrecoverable event.
*/
void onTragicEvent(Throwable event, String message);
/**
* Called once deletes have been applied either after a flush or on a deletes call
*/
void onDeletesApplied();
/**
* Called once the DocumentsWriter ticket queue has a backlog. This means there is an inner thread
* that tries to publish flushed segments but can't keep up with the other threads flushing new segments.
* This likely requires other thread to forcefully purge the buffer to help publishing. This
* can't be done in-place since we might hold index writer locks when this is called. The caller must ensure
* that the purge happens without an index writer lock hold
*
* @see DocumentsWriter#purgeBuffer(IndexWriter, boolean)
*/
void onTicketBacklog();
}
void subtractFlushedNumDocs(int numFlushed) { void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get(); int oldValue = numDocsInRAM.get();
@ -626,7 +677,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush * two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl * is called after this method, to release the flush lock in DWFlushControl
*/ */
long flushAllThreads() long flushAllThreads(IndexWriter writer)
throws IOException { throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue; final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
@ -695,92 +746,8 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
} }
void putEvent(Event event) {
events.add(event);
}
@Override @Override
public long ramBytesUsed() { public long ramBytesUsed() {
return flushControl.ramBytesUsed(); return flushControl.ramBytesUsed();
} }
static final class ResolveUpdatesEvent implements Event {
private final FrozenBufferedUpdates packet;
ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
this.packet = packet;
}
@Override
public void process(IndexWriter writer) throws IOException {
try {
packet.apply(writer);
} catch (Throwable t) {
try {
writer.onTragicEvent(t, "applyUpdatesPacket");
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
writer.flushDeletesCount.incrementAndGet();
}
}
static final class ApplyDeletesEvent implements Event {
static final Event INSTANCE = new ApplyDeletesEvent();
private ApplyDeletesEvent() {
// only one instance
}
@Override
public void process(IndexWriter writer) throws IOException {
writer.applyDeletesAndPurge(true); // we always purge!
}
}
static final class ForcedPurgeEvent implements Event {
static final Event INSTANCE = new ForcedPurgeEvent();
private ForcedPurgeEvent() {
// only one instance
}
@Override
public void process(IndexWriter writer) throws IOException {
writer.purge(true);
}
}
static class FlushFailedEvent implements Event {
private final SegmentInfo info;
public FlushFailedEvent(SegmentInfo info) {
this.info = info;
}
@Override
public void process(IndexWriter writer) throws IOException {
writer.flushFailed(info);
}
}
static class DeleteNewFilesEvent implements Event {
private final Collection<String> files;
public DeleteNewFilesEvent(Collection<String> files) {
this.files = files;
}
@Override
public void process(IndexWriter writer) throws IOException {
writer.deleteNewFiles(files);
}
}
public Queue<Event> eventQueue() {
return events;
}
} }

View File

@ -70,10 +70,9 @@ final class DocumentsWriterFlushControl implements Accountable {
private boolean closed = false; private boolean closed = false;
private final DocumentsWriter documentsWriter; private final DocumentsWriter documentsWriter;
private final LiveIndexWriterConfig config; private final LiveIndexWriterConfig config;
private final BufferedUpdatesStream bufferedUpdatesStream;
private final InfoStream infoStream; private final InfoStream infoStream;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedUpdatesStream bufferedUpdatesStream) { DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
this.infoStream = config.getInfoStream(); this.infoStream = config.getInfoStream();
this.stallControl = new DocumentsWriterStallControl(); this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool; this.perThreadPool = documentsWriter.perThreadPool;
@ -81,7 +80,6 @@ final class DocumentsWriterFlushControl implements Accountable {
this.config = config; this.config = config;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024; this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.documentsWriter = documentsWriter; this.documentsWriter = documentsWriter;
this.bufferedUpdatesStream = bufferedUpdatesStream;
} }
public synchronized long activeBytes() { public synchronized long activeBytes() {

View File

@ -169,11 +169,10 @@ class DocumentsWriterPerThread {
private final AtomicLong pendingNumDocs; private final AtomicLong pendingNumDocs;
private final LiveIndexWriterConfig indexWriterConfig; private final LiveIndexWriterConfig indexWriterConfig;
private final boolean enableTestPoints; private final boolean enableTestPoints;
private final IndexWriter indexWriter; private final int indexVersionCreated;
public DocumentsWriterPerThread(IndexWriter writer, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue, public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException { FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
this.indexWriter = writer;
this.directoryOrig = directoryOrig; this.directoryOrig = directoryOrig;
this.directory = new TrackingDirectoryWrapper(directory); this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos; this.fieldInfos = fieldInfos;
@ -200,6 +199,7 @@ class DocumentsWriterPerThread {
// it really sucks that we need to pull this within the ctor and pass this ref to the chain! // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexWriterConfig.getIndexingChain().getChain(this); consumer = indexWriterConfig.getIndexingChain().getChain(this);
this.enableTestPoints = enableTestPoints; this.enableTestPoints = enableTestPoints;
this.indexVersionCreated = indexVersionCreated;
} }
public FieldInfos.Builder getFieldInfosBuilder() { public FieldInfos.Builder getFieldInfosBuilder() {
@ -207,7 +207,7 @@ class DocumentsWriterPerThread {
} }
public int getIndexCreatedVersionMajor() { public int getIndexCreatedVersionMajor() {
return indexWriter.segmentInfos.getIndexCreatedVersionMajor(); return indexVersionCreated;
} }
final void testPoint(String message) { final void testPoint(String message) {
@ -227,7 +227,7 @@ class DocumentsWriterPerThread {
} }
} }
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException { public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try { try {
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing"; assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
testPoint("DocumentsWriterPerThread addDocument start"); testPoint("DocumentsWriterPerThread addDocument start");
@ -263,11 +263,11 @@ class DocumentsWriterPerThread {
return finishDocument(deleteNode); return finishDocument(deleteNode);
} finally { } finally {
maybeAbort("updateDocument"); maybeAbort("updateDocument", flushNotifications);
} }
} }
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException { public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try { try {
testPoint("DocumentsWriterPerThread addDocuments start"); testPoint("DocumentsWriterPerThread addDocuments start");
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing"; assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
@ -343,7 +343,7 @@ class DocumentsWriterPerThread {
docState.clear(); docState.clear();
} }
} finally { } finally {
maybeAbort("updateDocuments"); maybeAbort("updateDocuments", flushNotifications);
} }
} }
@ -425,7 +425,7 @@ class DocumentsWriterPerThread {
} }
/** Flush all pending docs to a new segment */ /** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException { FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert numDocsInRAM > 0; assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM); segmentInfo.setMaxDoc(numDocsInRAM);
@ -499,7 +499,7 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos, FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush,
sortMap); sortMap);
sealFlushedSegment(fs, sortMap); sealFlushedSegment(fs, sortMap, flushNotifications);
if (infoStream.isEnabled("DWPT")) { if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec"); infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
} }
@ -508,18 +508,18 @@ class DocumentsWriterPerThread {
onAbortingException(t); onAbortingException(t);
throw t; throw t;
} finally { } finally {
maybeAbort("flush"); maybeAbort("flush", flushNotifications);
} }
} }
private void maybeAbort(String location) throws IOException { private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
if (hasHitAbortingException() && aborted == false) { if (hasHitAbortingException() && aborted == false) {
// if we are already aborted don't do anything here // if we are already aborted don't do anything here
try { try {
abort(); abort();
} finally { } finally {
// whatever we do here we have to fire this tragic event up. // whatever we do here we have to fire this tragic event up.
indexWriter.onTragicEvent(abortingException, location); flushNotifications.onTragicEvent(abortingException, location);
} }
} }
} }
@ -545,7 +545,7 @@ class DocumentsWriterPerThread {
* Seals the {@link SegmentInfo} for the new flushed segment and persists * Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}. * the deleted documents {@link MutableBits}.
*/ */
void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap) throws IOException { void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert flushedSegment != null; assert flushedSegment != null;
SegmentCommitInfo newSegment = flushedSegment.segmentInfo; SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
@ -559,7 +559,7 @@ class DocumentsWriterPerThread {
if (indexWriterConfig.getUseCompoundFile()) { if (indexWriterConfig.getUseCompoundFile()) {
Set<String> originalFiles = newSegment.info.files(); Set<String> originalFiles = newSegment.info.files();
// TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup... // TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup...
indexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context); IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);
filesToDelete.addAll(originalFiles); filesToDelete.addAll(originalFiles);
newSegment.info.setUseCompoundFile(true); newSegment.info.setUseCompoundFile(true);
} }

View File

@ -18,6 +18,8 @@ package org.apache.lucene.index;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -42,6 +44,7 @@ import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
@ -51,7 +54,7 @@ import org.apache.lucene.util.RamUsageEstimator;
* structure to hold them. We don't hold docIDs because these are applied on * structure to hold them. We don't hold docIDs because these are applied on
* flush. * flush.
*/ */
class FrozenBufferedUpdates { final class FrozenBufferedUpdates {
/* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs /* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs
* in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important * in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important
@ -297,7 +300,7 @@ class FrozenBufferedUpdates {
// Must open while holding IW lock so that e.g. segments are not merged // Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers // away, dropped from 100% deletions, etc., before we can open the readers
segStates = writer.bufferedUpdatesStream.openSegmentStates(infos, seenSegments, delGen()); segStates = openSegmentStates(writer, infos, seenSegments, delGen());
if (segStates.length == 0) { if (segStates.length == 0) {
@ -357,7 +360,7 @@ class FrozenBufferedUpdates {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates: // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished: // Record that this packet is finished:
writer.bufferedUpdatesStream.finished(this); writer.finished(this);
finished = true; finished = true;
@ -378,7 +381,7 @@ class FrozenBufferedUpdates {
if (finished == false) { if (finished == false) {
// Record that this packet is finished: // Record that this packet is finished:
writer.bufferedUpdatesStream.finished(this); writer.finished(this);
} }
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
@ -388,18 +391,67 @@ class FrozenBufferedUpdates {
if (iter > 0) { if (iter > 0) {
message += "; " + (iter+1) + " iters due to concurrent merges"; message += "; " + (iter+1) + " iters due to concurrent merges";
} }
message += "; " + writer.bufferedUpdatesStream.getPendingUpdatesCount() + " packets remain"; message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message); infoStream.message("BD", message);
} }
} }
/** Opens SegmentReader and inits SegmentState for each segment. */
private static BufferedUpdatesStream.SegmentState[] openSegmentStates(IndexWriter writer, List<SegmentCommitInfo> infos,
Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
segStates.add(new BufferedUpdatesStream.SegmentState(writer.getPooledInstance(info, true), writer::release, info));
alreadySeenSegments.add(info);
}
}
} catch (Throwable t) {
try {
IOUtils.close(segStates);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
}
/** Close segment states previously opened with openSegmentStates. */
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getSegmentInfo());
}
}
}
IOUtils.close(segmentStates);
if (writer.infoStream.isEnabled("BD")) {
writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
}
return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
}
private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException { boolean success, Set<String> delFiles) throws IOException {
synchronized (writer) { synchronized (writer) {
BufferedUpdatesStream.ApplyDeletesResult result; BufferedUpdatesStream.ApplyDeletesResult result;
try { try {
result = writer.bufferedUpdatesStream.closeSegmentStates(segStates, success); result = closeSegmentStates(writer, segStates, success);
} finally { } finally {
// Matches the incRef we did above, but we must do the decRef after closing segment states else // Matches the incRef we did above, but we must do the decRef after closing segment states else
// IFD can't delete still-open files // IFD can't delete still-open files
@ -407,8 +459,8 @@ class FrozenBufferedUpdates {
} }
if (result.anyDeletes) { if (result.anyDeletes) {
writer.maybeMerge.set(true); writer.maybeMerge.set(true);
writer.checkpoint(); writer.checkpoint();
} }
if (result.allDeleted != null) { if (result.allDeleted != null) {
@ -857,8 +909,4 @@ class FrozenBufferedUpdates {
boolean any() { boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0; return deleteTerms.size() > 0 || deleteQueries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0;
} }
boolean anyDeleteTerms() {
return deleteTerms.size() > 0;
}
} }

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -236,7 +237,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
/** Used only for testing. */ /** Used only for testing. */
boolean enableTestPoints = false; private final boolean enableTestPoints;
static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1; static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
@ -291,7 +292,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final FieldNumbers globalFieldNumberMap; final FieldNumbers globalFieldNumberMap;
final DocumentsWriter docWriter; final DocumentsWriter docWriter;
private final Queue<Event> eventQueue; private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<>();
final IndexFileDeleter deleter; final IndexFileDeleter deleter;
// used by forceMerge to note those needing merging // used by forceMerge to note those needing merging
@ -345,6 +346,51 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* card to make sure they can later charge you when you check out. */ * card to make sure they can later charge you when you check out. */
final AtomicLong pendingNumDocs = new AtomicLong(); final AtomicLong pendingNumDocs = new AtomicLong();
private final DocumentsWriter.FlushNotifications flushNotifications = new DocumentsWriter.FlushNotifications() {
@Override
public void deleteUnusedFiles(Collection<String> files) {
eventQueue.add(w -> w.deleteNewFiles(files));
}
@Override
public void flushFailed(SegmentInfo info) {
eventQueue.add(w -> w.flushFailed(info));
}
@Override
public void afterSegmentsFlushed() throws IOException {
try {
purge(false);
} finally {
if (false) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
@Override
public void onTragicEvent(Throwable event, String message) {
IndexWriter.this.onTragicEvent(event, message);
}
@Override
public void onDeletesApplied() {
eventQueue.add(w -> {
try {
w.purge(true);
} finally {
flushCount.incrementAndGet();
}
}
);
}
@Override
public void onTicketBacklog() {
eventQueue.add(w -> w.purge(true));
}
};
DirectoryReader getReader() throws IOException { DirectoryReader getReader() throws IOException {
return getReader(true, false); return getReader(true, false);
} }
@ -439,7 +485,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
try { try {
// TODO: should we somehow make this available in the returned NRT reader? // TODO: should we somehow make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads(); long seqNo = docWriter.flushAllThreads(this);
if (seqNo < 0) { if (seqNo < 0) {
anyChanges = true; anyChanges = true;
seqNo = -seqNo; seqNo = -seqNo;
@ -660,7 +706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (d instanceof FSDirectory && ((FSDirectory) d).checkPendingDeletions()) { if (d instanceof FSDirectory && ((FSDirectory) d).checkPendingDeletions()) {
throw new IllegalArgumentException("Directory " + d + " still has pending deleted files; cannot initialize IndexWriter"); throw new IllegalArgumentException("Directory " + d + " still has pending deleted files; cannot initialize IndexWriter");
} }
enableTestPoints = isEnableTestPoints();
conf.setIndexWriter(this); // prevent reuse by other instances conf.setIndexWriter(this); // prevent reuse by other instances
config = conf; config = conf;
infoStream = config.getInfoStream(); infoStream = config.getInfoStream();
@ -678,9 +724,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
mergeScheduler = config.getMergeScheduler(); mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream); mergeScheduler.setInfoStream(infoStream);
codec = config.getCodec(); codec = config.getCodec();
bufferedUpdatesStream = new BufferedUpdatesStream(this);
OpenMode mode = config.getOpenMode(); OpenMode mode = config.getOpenMode();
boolean create; boolean create;
if (mode == OpenMode.CREATE) { if (mode == OpenMode.CREATE) {
@ -824,8 +867,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
validateIndexSort(); validateIndexSort();
config.getFlushPolicy().init(config); config.getFlushPolicy().init(config);
docWriter = new DocumentsWriter(this, config, directoryOrig, directory); bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
eventQueue = docWriter.eventQueue(); docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
enableTestPoints, this::newSegmentName,
config, directoryOrig, directory, globalFieldNumberMap);
readerPool = new ReaderPool(directory, directoryOrig, segmentInfos, globalFieldNumberMap, readerPool = new ReaderPool(directory, directoryOrig, segmentInfos, globalFieldNumberMap,
bufferedUpdatesStream::getCompletedDelGen, infoStream, conf.getSoftDeletesField(), reader); bufferedUpdatesStream::getCompletedDelGen, infoStream, conf.getSoftDeletesField(), reader);
if (config.getReaderPooling()) { if (config.getReaderPooling()) {
@ -2457,7 +2502,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException { synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
assert packet != null && packet.any(); assert packet != null && packet.any();
bufferedUpdatesStream.push(packet); bufferedUpdatesStream.push(packet);
docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet)); eventQueue.add(new ResolveUpdatesEvent(packet));
} }
/** /**
@ -2479,7 +2524,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (globalPacket != null && globalPacket.any()) { if (globalPacket != null && globalPacket.any()) {
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock: // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
bufferedUpdatesStream.push(globalPacket); bufferedUpdatesStream.push(globalPacket);
docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(globalPacket)); eventQueue.add(new ResolveUpdatesEvent(globalPacket));
} }
// Publishing the segment must be sync'd on IW -> BDS to make the sure // Publishing the segment must be sync'd on IW -> BDS to make the sure
@ -2489,7 +2534,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
nextGen = bufferedUpdatesStream.push(packet); nextGen = bufferedUpdatesStream.push(packet);
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock: // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet)); eventQueue.add(new ResolveUpdatesEvent(packet));
} else { } else {
// Since we don't have a delete packet to apply we can get a new // Since we don't have a delete packet to apply we can get a new
@ -2877,7 +2922,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// TODO: unlike merge, on exception we arent sniping any trash cfs files here? // TODO: unlike merge, on exception we arent sniping any trash cfs files here?
// createCompoundFile tries to cleanup, but it might not always be able to... // createCompoundFile tries to cleanup, but it might not always be able to...
try { try {
createCompoundFile(infoStream, trackingCFSDir, info, context); createCompoundFile(infoStream, trackingCFSDir, info, context, this::deleteNewFiles);
} finally { } finally {
// delete new non cfs files directly: they were never // delete new non cfs files directly: they were never
// registered with IFD // registered with IFD
@ -3060,7 +3105,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean flushSuccess = false; boolean flushSuccess = false;
boolean success = false; boolean success = false;
try { try {
seqNo = docWriter.flushAllThreads(); seqNo = docWriter.flushAllThreads(this);
if (seqNo < 0) { if (seqNo < 0) {
anyChanges = true; anyChanges = true;
seqNo = -seqNo; seqNo = -seqNo;
@ -3421,7 +3466,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
boolean flushSuccess = false; boolean flushSuccess = false;
try { try {
long seqNo = docWriter.flushAllThreads(); long seqNo = docWriter.flushAllThreads(this);
if (seqNo < 0) { if (seqNo < 0) {
seqNo = -seqNo; seqNo = -seqNo;
anyChanges = true; anyChanges = true;
@ -3469,7 +3514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply all deletes for all segments buffered updates bytesUsed=" + bufferedUpdatesStream.ramBytesUsed() + " reader pool bytesUsed=" + readerPool.ramBytesUsed()); infoStream.message("IW", "now apply all deletes for all segments buffered updates bytesUsed=" + bufferedUpdatesStream.ramBytesUsed() + " reader pool bytesUsed=" + readerPool.ramBytesUsed());
} }
bufferedUpdatesStream.waitApplyAll(); bufferedUpdatesStream.waitApplyAll(this);
} }
// for testing only // for testing only
@ -3998,9 +4043,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Does initial setup for a merge, which is fast but holds /** Does initial setup for a merge, which is fast but holds
* the synchronized lock on IndexWriter instance. */ * the synchronized lock on IndexWriter instance. */
final void mergeInit(MergePolicy.OneMerge merge) throws IOException { final void mergeInit(MergePolicy.OneMerge merge) throws IOException {
assert Thread.holdsLock(this) == false;
// Make sure any deletes that must be resolved before we commit the merge are complete: // Make sure any deletes that must be resolved before we commit the merge are complete:
bufferedUpdatesStream.waitApplyForMerge(merge.segments); bufferedUpdatesStream.waitApplyForMerge(merge.segments, this);
boolean success = false; boolean success = false;
try { try {
@ -4267,7 +4312,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Collection<String> filesToRemove = merge.info.files(); Collection<String> filesToRemove = merge.info.files();
TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory); TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
try { try {
createCompoundFile(infoStream, trackingCFSDir, merge.info.info, context); createCompoundFile(infoStream, trackingCFSDir, merge.info.info, context, this::deleteNewFiles);
success = true; success = true;
} catch (Throwable t) { } catch (Throwable t) {
synchronized(this) { synchronized(this) {
@ -4751,7 +4796,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* deletion files, this SegmentInfo must not reference such files when this * deletion files, this SegmentInfo must not reference such files when this
* method is called, because they are not allowed within a compound file. * method is called, because they are not allowed within a compound file.
*/ */
final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context) throws IOException { static final void createCompoundFile(InfoStream infoStream, TrackingDirectoryWrapper directory, final SegmentInfo info, IOContext context, IOUtils.IOConsumer<Collection<String>> deleteFiles) throws IOException {
// maybe this check is not needed, but why take the risk? // maybe this check is not needed, but why take the risk?
if (!directory.getCreatedFiles().isEmpty()) { if (!directory.getCreatedFiles().isEmpty()) {
@ -4769,7 +4814,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} finally { } finally {
if (!success) { if (!success) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(directory.getCreatedFiles()); deleteFiles.accept(directory.getCreatedFiles());
} }
} }
@ -4783,14 +4828,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException if an {@link IOException} occurs * @throws IOException if an {@link IOException} occurs
* @see IndexFileDeleter#deleteNewFiles(Collection) * @see IndexFileDeleter#deleteNewFiles(Collection)
*/ */
synchronized final void deleteNewFiles(Collection<String> files) throws IOException { private synchronized void deleteNewFiles(Collection<String> files) throws IOException {
deleter.deleteNewFiles(files); deleter.deleteNewFiles(files);
} }
/** /**
* Cleans up residuals from a segment that could not be entirely flushed due to an error * Cleans up residuals from a segment that could not be entirely flushed due to an error
*/ */
synchronized final void flushFailed(SegmentInfo info) throws IOException { private synchronized final void flushFailed(SegmentInfo info) throws IOException {
// TODO: this really should be a tragic // TODO: this really should be a tragic
Collection<String> files; Collection<String> files;
try { try {
@ -4803,29 +4847,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
deleter.deleteNewFiles(files); deleter.deleteNewFiles(files);
} }
} }
final int purge(boolean forced) throws IOException { private int purge(boolean forced) throws IOException {
return docWriter.purgeBuffer(this, forced); return docWriter.purgeBuffer(this, forced);
} }
final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
flushCount.incrementAndGet();
}
}
final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
if (triggerMerge) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
/** Record that the files referenced by this {@link SegmentInfos} are still in use. /** Record that the files referenced by this {@link SegmentInfos} are still in use.
* *
* @lucene.internal */ * @lucene.internal */
@ -4867,8 +4893,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* encoded inside the {@link #process(IndexWriter)} method. * encoded inside the {@link #process(IndexWriter)} method.
* *
*/ */
interface Event { @FunctionalInterface
private interface Event {
/** /**
* Processes the event. This method is called by the {@link IndexWriter} * Processes the event. This method is called by the {@link IndexWriter}
* passed as the first argument. * passed as the first argument.
@ -4971,4 +4997,43 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
ensureOpen(false); ensureOpen(false);
return readerPool.get(info, create); return readerPool.get(info, create);
} }
private static final class ResolveUpdatesEvent implements Event {
private final FrozenBufferedUpdates packet;
ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
this.packet = packet;
}
@Override
public void process(IndexWriter writer) throws IOException {
try {
packet.apply(writer);
} catch (Throwable t) {
try {
writer.onTragicEvent(t, "applyUpdatesPacket");
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
writer.flushDeletesCount.incrementAndGet();
}
}
void finished(FrozenBufferedUpdates packet) {
bufferedUpdatesStream.finished(packet);
}
int getPendingUpdatesCount() {
return bufferedUpdatesStream.getPendingUpdatesCount();
}
/**
* Tests should override this to enable test points. Default is <code>false</code>.
*/
protected boolean isEnableTestPoints() {
return false;
}
} }

View File

@ -1836,9 +1836,14 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
Directory dir = newMockDirectory(); // we want to ensure we don't leak any locks or file handles Directory dir = newMockDirectory(); // we want to ensure we don't leak any locks or file handles
IndexWriterConfig iwc = new IndexWriterConfig(null); IndexWriterConfig iwc = new IndexWriterConfig(null);
iwc.setInfoStream(evilInfoStream); iwc.setInfoStream(evilInfoStream);
IndexWriter iw = new IndexWriter(dir, iwc);
// TODO: cutover to RandomIndexWriter.mockIndexWriter? // TODO: cutover to RandomIndexWriter.mockIndexWriter?
iw.enableTestPoints = true; IndexWriter iw = new IndexWriter(dir, iwc) {
@Override
protected boolean isEnableTestPoints() {
return true;
}
};
Document doc = new Document(); Document doc = new Document();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
iw.addDocument(doc); iw.addDocument(doc);

View File

@ -74,8 +74,12 @@ public class TestInfoStream extends LuceneTestCase {
return true; return true;
} }
}); });
IndexWriter iw = new IndexWriter(dir, iwc); IndexWriter iw = new IndexWriter(dir, iwc) {
iw.enableTestPoints = true; @Override
protected boolean isEnableTestPoints() {
return true;
}
};
iw.addDocument(new Document()); iw.addDocument(new Document());
iw.close(); iw.close();
dir.close(); dir.close();

View File

@ -80,7 +80,12 @@ public class RandomIndexWriter implements Closeable {
IndexWriter iw; IndexWriter iw;
boolean success = false; boolean success = false;
try { try {
iw = new IndexWriter(dir, conf); iw = new IndexWriter(dir, conf) {
@Override
protected boolean isEnableTestPoints() {
return true;
}
};
success = true; success = true;
} finally { } finally {
if (reader != null) { if (reader != null) {
@ -91,7 +96,6 @@ public class RandomIndexWriter implements Closeable {
} }
} }
} }
iw.enableTestPoints = true;
return iw; return iw;
} }