LUCENE-8271: Remove IndexWriter from DWFlushQueue

This simplifies DocumentsWriterFlushQueue by moving all IW related
code out of it. The DWFQ now only contains logic for taking tickets
off the queue and applying it to a given consumer. The logic now
entirely resides in IW and has private visibility. Locking
also is more contained since IW knows exactly what is called and when.
This commit is contained in:
Simon Willnauer 2018-04-24 15:40:48 +02:00 committed by GitHub
parent d702dc6133
commit d32ce90924
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 159 additions and 208 deletions

View File

@ -31,13 +31,13 @@ import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/**
@ -190,12 +190,13 @@ final class DocumentsWriter implements Closeable, Accountable {
}
return false;
}
int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
void purgeFlushTickets(boolean forced, IOUtils.IOConsumer<DocumentsWriterFlushQueue.FlushTicket> consumer)
throws IOException {
if (forced) {
return ticketQueue.forcePurge(writer);
ticketQueue.forcePurge(consumer);
} else {
return ticketQueue.tryPurge(writer);
ticketQueue.tryPurge(consumer);
}
}
@ -206,7 +207,7 @@ final class DocumentsWriter implements Closeable, Accountable {
private void ensureOpen() throws AlreadyClosedException {
if (closed) {
throw new AlreadyClosedException("this IndexWriter is closed");
throw new AlreadyClosedException("this DocumentsWriter is closed");
}
}
@ -214,8 +215,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
synchronized void abort() throws IOException {
boolean success = false;
try {
deleteQueue.clear();
@ -260,17 +260,19 @@ final class DocumentsWriter implements Closeable, Accountable {
/** Locks all currently active DWPT and aborts them.
* The returned Closeable should be closed once the locks for the aborted
* DWPTs can be released. */
synchronized Closeable lockAndAbortAll(IndexWriter indexWriter) throws IOException {
assert indexWriter.holdsFullFlushLock();
synchronized Closeable lockAndAbortAll() throws IOException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
}
// Make sure we move all pending tickets into the flush queue:
ticketQueue.forcePurge(indexWriter);
ticketQueue.forcePurge(ticket -> {
if (ticket.getFlushedSegment() != null) {
pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc());
}
});
List<ThreadState> threadStates = new ArrayList<>();
AtomicBoolean released = new AtomicBoolean(false);
final Closeable release = () -> {
assert indexWriter.holdsFullFlushLock();
if (released.compareAndSet(false, true)) { // only once
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAllAbortedThread");
@ -519,7 +521,7 @@ final class DocumentsWriter implements Closeable, Accountable {
while (flushingDWPT != null) {
hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
DocumentsWriterFlushQueue.FlushTicket ticket = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@ -618,7 +620,7 @@ final class DocumentsWriter implements Closeable, Accountable {
interface FlushNotifications { // TODO maybe we find a better name for this?
/**
* Called when files were written to disk that are not used anymore. It's the implementations responsibilty
* Called when files were written to disk that are not used anymore. It's the implementation's responsibility
* to clean these files up
*/
void deleteUnusedFiles(Collection<String> files);
@ -648,9 +650,9 @@ final class DocumentsWriter implements Closeable, Accountable {
* that tries to publish flushed segments but can't keep up with the other threads flushing new segments.
* This likely requires other thread to forcefully purge the buffer to help publishing. This
* can't be done in-place since we might hold index writer locks when this is called. The caller must ensure
* that the purge happens without an index writer lock hold
* that the purge happens without an index writer lock being held.
*
* @see DocumentsWriter#purgeBuffer(IndexWriter, boolean)
* @see DocumentsWriter#purgeFlushTickets(boolean, IOUtils.IOConsumer)
*/
void onTicketBacklog();
}
@ -677,7 +679,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
long flushAllThreads(IndexWriter writer)
long flushAllThreads()
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
@ -713,7 +715,6 @@ final class DocumentsWriter implements Closeable, Accountable {
}
ticketQueue.addDeletes(flushingDeleteQueue);
}
ticketQueue.forcePurge(writer);
// we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
// concurrently if we have very small ram buffers this happens quite frequently
assert !flushingDeleteQueue.anyChanges();
@ -727,8 +728,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
void finishFullFlush(IndexWriter indexWriter, boolean success) {
assert indexWriter.holdsFullFlushLock();
void finishFullFlush(boolean success) {
try {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);
@ -739,7 +739,6 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
}
} finally {
pendingChangesInCurrentFullFlush = false;

View File

@ -23,29 +23,28 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOUtils;
/**
* @lucene.internal
*/
class DocumentsWriterFlushQueue {
final class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<>();
// we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it.
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized (this) {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
boolean success = false;
try {
queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
success = true;
} finally {
if (!success) {
decTickets();
}
synchronized void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
boolean success = false;
try {
queue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
success = true;
} finally {
if (!success) {
decTickets();
}
}
}
@ -60,14 +59,14 @@ class DocumentsWriterFlushQueue {
assert numTickets >= 0;
}
synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
boolean success = false;
try {
// prepare flush freezes the global deletes - do in synced block!
final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
@ -78,13 +77,15 @@ class DocumentsWriterFlushQueue {
}
}
synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) {
synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
assert ticket.hasSegment;
// the actual flush is done asynchronously and once done the FlushedSegment
// is passed to the flush ticket
ticket.setSegment(segment);
}
synchronized void markTicketFailed(SegmentFlushTicket ticket) {
synchronized void markTicketFailed(FlushTicket ticket) {
assert ticket.hasSegment;
// to free the queue we mark tickets as failed just to clean up the queue.
ticket.setFailed();
}
@ -94,9 +95,8 @@ class DocumentsWriterFlushQueue {
return ticketCount.get() != 0;
}
private int innerPurge(IndexWriter writer) throws IOException {
private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
int numPurged = 0;
while (true) {
final FlushTicket head;
final boolean canPublish;
@ -105,167 +105,99 @@ class DocumentsWriterFlushQueue {
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
numPurged++;
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* the downside is that we need to force a purge on fullFlush since there could
* be a ticket still in the queue.
*/
head.publish(writer);
consumer.accept(head);
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
decTickets();
// we hold the purgeLock so no other thread should have polled:
assert poll == head;
ticketCount.decrementAndGet();
assert poll == head;
}
}
} else {
break;
}
}
return numPurged;
}
int forcePurge(IndexWriter writer) throws IOException {
void forcePurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
purgeLock.lock();
try {
return innerPurge(writer);
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
int tryPurge(IndexWriter writer) throws IOException {
void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
if (purgeLock.tryLock()) {
try {
return innerPurge(writer);
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
return 0;
}
public int getTicketCount() {
int getTicketCount() {
return ticketCount.get();
}
synchronized void clear() {
queue.clear();
ticketCount.set(0);
}
static abstract class FlushTicket {
protected FrozenBufferedUpdates frozenUpdates;
protected boolean published = false;
protected FlushTicket(FrozenBufferedUpdates frozenUpdates) {
this.frozenUpdates = frozenUpdates;
}
protected abstract void publish(IndexWriter writer) throws IOException;
protected abstract boolean canPublish();
/**
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on {@code IW -> BDS} so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates globalPacket)
throws IOException {
assert newSegment != null;
SegmentCommitInfo segmentInfo = newSegment.segmentInfo;
assert segmentInfo != null;
final FrozenBufferedUpdates segmentUpdates = newSegment.segmentUpdates;
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);
}
if (segmentUpdates != null && indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
}
// now publish!
indexWriter.publishFlushedSegment(segmentInfo, newSegment.fieldInfos, segmentUpdates, globalPacket, newSegment.sortMap);
}
protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
if (bufferedUpdates != null && bufferedUpdates.any()) {
indexWriter.publishFrozenUpdates(bufferedUpdates);
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
}
}
} else {
publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);
}
}
}
static final class GlobalDeletesTicket extends FlushTicket {
protected GlobalDeletesTicket(FrozenBufferedUpdates frozenUpdates) {
super(frozenUpdates);
}
@Override
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
// it's a global ticket - no segment to publish
finishFlush(writer, null, frozenUpdates);
}
@Override
protected boolean canPublish() {
return true;
}
}
static final class SegmentFlushTicket extends FlushTicket {
static final class FlushTicket {
private final FrozenBufferedUpdates frozenUpdates;
private final boolean hasSegment;
private FlushedSegment segment;
private boolean failed = false;
protected SegmentFlushTicket(FrozenBufferedUpdates frozenDeletes) {
super(frozenDeletes);
private boolean published = false;
FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
this.frozenUpdates = frozenUpdates;
this.hasSegment = hasSegment;
}
@Override
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
boolean canPublish() {
return hasSegment == false || segment != null || failed;
}
synchronized void markPublished() {
assert published == false: "ticket was already published - can not publish twice";
published = true;
finishFlush(writer, segment, frozenUpdates);
}
protected void setSegment(FlushedSegment segment) {
private void setSegment(FlushedSegment segment) {
assert !failed;
this.segment = segment;
}
protected void setFailed() {
private void setFailed() {
assert segment == null;
failed = true;
}
@Override
protected boolean canPublish() {
return segment != null || failed;
/**
* Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the
* case if this ticket represents a flushed global frozen updates package.
*/
FlushedSegment getFlushedSegment() {
return segment;
}
/**
* Returns a frozen global deletes package.
*/
FrozenBufferedUpdates getFrozenUpdates() {
return frozenUpdates;
}
}
}

View File

@ -46,7 +46,7 @@ import org.apache.lucene.util.Version;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
class DocumentsWriterPerThread {
final class DocumentsWriterPerThread {
/**
* The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method
@ -102,7 +102,7 @@ class DocumentsWriterPerThread {
}
}
static class FlushedSegment {
static final class FlushedSegment {
final SegmentCommitInfo segmentInfo;
final FieldInfos fieldInfos;
final FrozenBufferedUpdates segmentUpdates;
@ -152,7 +152,6 @@ class DocumentsWriterPerThread {
final DocConsumer consumer;
final Counter bytesUsed;
SegmentWriteState flushState;
// Updates for our still-in-RAM (to be flushed next) segment
final BufferedUpdates pendingUpdates;
final SegmentInfo segmentInfo; // Current segment we are working on

View File

@ -274,7 +274,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final AtomicReference<Throwable> tragedy = new AtomicReference<>(null);
private final Directory directoryOrig; // original user directory
final Directory directory; // wrapped with additional checks
private final Directory directory; // wrapped with additional checks
private final Analyzer analyzer; // how to analyze text
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@ -360,7 +360,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public void afterSegmentsFlushed() throws IOException {
try {
purge(false);
publishFlushedSegments(false);
} finally {
if (false) {
maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
@ -377,7 +377,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public void onDeletesApplied() {
eventQueue.add(w -> {
try {
w.purge(true);
w.publishFlushedSegments(true);
} finally {
flushCount.incrementAndGet();
}
@ -387,7 +387,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public void onTicketBacklog() {
eventQueue.add(w -> w.purge(true));
eventQueue.add(w -> w.publishFlushedSegments(true));
}
};
@ -485,7 +485,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) {
try {
// TODO: should we somehow make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads(this);
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
seqNo = -seqNo;
@ -497,7 +497,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// if we flushed anything.
flushCount.incrementAndGet();
}
publishFlushedSegments(true);
processEvents(false);
if (applyAllDeletes) {
@ -534,7 +534,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
success = true;
} finally {
// Done: finish the full flush!
docWriter.finishFullFlush(this, success);
assert holdsFullFlushLock();
docWriter.finishFullFlush(success);
if (success) {
processEvents(false);
doAfterFlush();
@ -2207,10 +2208,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// set it to false before calling rollbackInternal
mergeScheduler.close();
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort(this); // don't sync on IW here
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
assert !Thread.holdsLock(this) : "IndexWriter lock should never be hold when aborting";
docWriter.abort(); // don't sync on IW here
docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
publishFlushedSegments(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
synchronized (this) {
if (pendingCommit != null) {
@ -2352,7 +2354,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
try {
synchronized (fullFlushLock) {
try (Closeable finalizer = docWriter.lockAndAbortAll(this)) {
try (Closeable finalizer = docWriter.lockAndAbortAll()) {
processEvents(false);
synchronized (this) {
try {
@ -2499,19 +2501,33 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
segmentInfos.changed();
}
synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) {
assert packet != null && packet.any();
bufferedUpdatesStream.push(packet);
eventQueue.add(new ResolveUpdatesEvent(packet));
long nextGen = bufferedUpdatesStream.push(packet);
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
eventQueue.add(w -> {
try {
packet.apply(w);
} catch (Throwable t) {
try {
w.onTragicEvent(t, "applyUpdatesPacket");
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
w.flushDeletesCount.incrementAndGet();
});
return nextGen;
}
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
FieldInfos fieldInfos, FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
Sorter.DocMap sortMap) throws IOException {
private synchronized void publishFlushedSegment(SegmentCommitInfo newSegment, FieldInfos fieldInfos,
FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
Sorter.DocMap sortMap) throws IOException {
boolean published = false;
try {
// Lock order IW -> BDS
@ -2522,20 +2538,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
if (globalPacket != null && globalPacket.any()) {
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
bufferedUpdatesStream.push(globalPacket);
eventQueue.add(new ResolveUpdatesEvent(globalPacket));
publishFrozenUpdates(globalPacket);
}
// Publishing the segment must be sync'd on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedUpdatesStream.push(packet);
// Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
eventQueue.add(new ResolveUpdatesEvent(packet));
nextGen = publishFrozenUpdates(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
@ -3105,7 +3115,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean flushSuccess = false;
boolean success = false;
try {
seqNo = docWriter.flushAllThreads(this);
seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
seqNo = -seqNo;
@ -3115,7 +3125,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// if we flushed anything.
flushCount.incrementAndGet();
}
publishFlushedSegments(true);
// cannot pass triggerMerges=true here else it can lead to deadlock:
processEvents(false);
@ -3169,8 +3179,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "hit exception during prepareCommit");
}
}
assert holdsFullFlushLock();
// Done: finish the full flush!
docWriter.finishFullFlush(this, flushSuccess);
docWriter.finishFullFlush(flushSuccess);
doAfterFlush();
}
}
@ -3466,7 +3477,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
long seqNo = docWriter.flushAllThreads(this);
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
@ -3477,9 +3488,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
publishFlushedSegments(true);
flushSuccess = true;
} finally {
docWriter.finishFullFlush(this, flushSuccess);
assert holdsFullFlushLock();
docWriter.finishFullFlush(flushSuccess);
processEvents(false);
}
}
@ -4848,8 +4861,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
private int purge(boolean forced) throws IOException {
return docWriter.purgeBuffer(this, forced);
/**
* Publishes the flushed segment, segment-private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on {@code IW -> BDS} so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
* @param forced if <code>true</code> this call will block on the ticket queue if the lock is held by another thread.
* if <code>false</code> the call will try to acquire the queue lock and exits if it's held by another thread.
*
*/
void publishFlushedSegments(boolean forced) throws IOException {
docWriter.purgeFlushTickets(forced, ticket -> {
DocumentsWriterPerThread.FlushedSegment newSegment = ticket.getFlushedSegment();
FrozenBufferedUpdates bufferedUpdates = ticket.getFrozenUpdates();
ticket.markPublished();
if (newSegment == null) { // this is a flushed global deletes package - not a segments
if (bufferedUpdates != null && bufferedUpdates.any()) { // TODO why can this be null?
publishFrozenUpdates(bufferedUpdates);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush: push buffered updates: " + bufferedUpdates);
}
}
} else {
assert newSegment.segmentInfo != null;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment seg-private updates=" + newSegment.segmentUpdates);
}
if (newSegment.segmentUpdates != null && infoStream.isEnabled("DW")) {
infoStream.message("IW", "flush: push buffered seg private updates: " + newSegment.segmentUpdates);
}
// now publish!
publishFlushedSegment(newSegment.segmentInfo, newSegment.fieldInfos, newSegment.segmentUpdates,
bufferedUpdates, newSegment.sortMap);
}
});
}
/** Record that the files referenced by this {@link SegmentInfos} are still in use.
@ -4998,30 +5043,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return readerPool.get(info, create);
}
private static final class ResolveUpdatesEvent implements Event {
private final FrozenBufferedUpdates packet;
ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
this.packet = packet;
}
@Override
public void process(IndexWriter writer) throws IOException {
try {
packet.apply(writer);
} catch (Throwable t) {
try {
writer.onTragicEvent(t, "applyUpdatesPacket");
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
writer.flushDeletesCount.incrementAndGet();
}
}
void finished(FrozenBufferedUpdates packet) {
bufferedUpdatesStream.finished(packet);
}