LUCENE-5006: Simplified DW and DWPT synchronization and concurrent interaction with IW

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1515459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2013-08-19 15:06:08 +00:00
parent c93e362fd4
commit 75ab79f0aa
12 changed files with 580 additions and 419 deletions

View File

@ -45,7 +45,6 @@ Optimizations
on Windows if NIOFSDirectory is used, mmapped files are still locked.
(Michael Poindexter, Robert Muir, Uwe Schindler)
======================= Lucene 4.5.0 =======================
New features
@ -206,6 +205,13 @@ Optimizations
* LUCENE-5170: Fixed several wrapper analyzers to inherit the reuse strategy
of the wrapped Analyzer. (Uwe Schindler, Robert Muir, Shay Banon)
* LUCENE-5006: Simplified DocumentsWriter and DocumentsWriterPerThread
synchronization and concurrent interaction with IndexWriter. DWPT is now
only setup once and has no reset logic. All segment publishing and state
transition from DWPT into IndexWriter is now done via an Event-Queue
processed from within the IndexWriter in order to prevent suituations
where DWPT or DW calling int IW causing deadlocks. (Simon Willnauer)
Documentation

View File

@ -19,18 +19,18 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriter.Event;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
@ -100,19 +100,15 @@ import org.apache.lucene.util.InfoStream;
*/
final class DocumentsWriter {
Directory directory;
private final Directory directory;
private volatile boolean closed;
final InfoStream infoStream;
Similarity similarity;
private final InfoStream infoStream;
List<String> newFiles;
private final LiveIndexWriterConfig config;
final IndexWriter indexWriter;
final LiveIndexWriterConfig indexWriterConfig;
private AtomicInteger numDocsInRAM = new AtomicInteger(0);
private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
@ -125,73 +121,72 @@ final class DocumentsWriter {
*/
private volatile boolean pendingChangesInCurrentFullFlush;
private Collection<String> abortedFiles; // List of files that were written before last abort()
final IndexingChain chain;
final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl;
final Codec codec;
DocumentsWriter(Codec codec, LiveIndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumbers globalFieldNumbers,
BufferedDeletesStream bufferedDeletesStream) {
this.codec = codec;
this.directory = directory;
this.indexWriter = writer;
this.infoStream = config.getInfoStream();
this.similarity = config.getSimilarity();
this.indexWriterConfig = writer.getConfig();
this.perThreadPool = config.getIndexerThreadPool();
this.chain = config.getIndexingChain();
this.perThreadPool.initialize(this, globalFieldNumbers, config);
flushPolicy = config.getFlushPolicy();
assert flushPolicy != null;
flushPolicy.init(this);
flushControl = new DocumentsWriterFlushControl(this, config);
}
private final IndexWriter writer;
private final Queue<Event> events;
synchronized void deleteQueries(final Query... queries) throws IOException {
DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directory) {
this.directory = directory;
this.config = config;
this.infoStream = config.getInfoStream();
this.perThreadPool = config.getIndexerThreadPool();
flushPolicy = config.getFlushPolicy();
this.writer = writer;
this.events = new ConcurrentLinkedQueue<Event>();
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedDeletesStream);
}
synchronized boolean deleteQueries(final Query... queries) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
flushControl.doOnDelete();
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
return applyAllDeletes(deleteQueue);
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
synchronized void deleteTerms(final Term... terms) throws IOException {
synchronized boolean deleteTerms(final Term... terms) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
return applyAllDeletes( deleteQueue);
}
DocumentsWriterDeleteQueue currentDeleteSession() {
return deleteQueue;
}
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
ticketQueue.addDeletesAndPurge(this, deleteQueue);
private final boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (flushControl.doApplyAllDeletes()) {
if (deleteQueue != null && !flushControl.isFullFlush()) {
ticketQueue.addDeletes(deleteQueue);
}
putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
return true;
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
return false;
}
final int purgeBuffer(IndexWriter writer, boolean forced) throws IOException {
if (forced) {
return ticketQueue.forcePurge(writer);
} else {
return ticketQueue.tryPurge(writer);
}
}
/** Returns how many docs are currently buffered in RAM. */
int getNumDocs() {
return numDocsInRAM.get();
}
Collection<String> abortedFiles() {
return abortedFiles;
}
private void ensureOpen() throws AlreadyClosedException {
if (closed) {
throw new AlreadyClosedException("this IndexWriter is closed");
@ -202,45 +197,37 @@ final class DocumentsWriter {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort() {
synchronized void abort(IndexWriter writer) {
assert !Thread.holdsLock(writer) : "IndexWriter lock should never be hold when aborting";
boolean success = false;
final Set<String> newFilesSet = new HashSet<String>();
try {
deleteQueue.clear();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "abort");
}
final int limit = perThreadPool.getActiveThreadState();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
try {
if (perThread.isActive()) { // we might be closed
try {
perThread.dwpt.abort();
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
} else {
assert closed;
}
abortThreadState(perThread, newFilesSet);
} finally {
perThread.unlock();
}
}
flushControl.abortPendingFlushes();
flushControl.abortPendingFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "done abort; abortedFiles=" + abortedFiles + " success=" + success);
infoStream.message("DW", "done abort; abortedFiles=" + newFilesSet + " success=" + success);
}
}
}
synchronized void lockAndAbortAll() {
synchronized void lockAndAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
@ -249,20 +236,15 @@ final class DocumentsWriter {
try {
deleteQueue.clear();
final int limit = perThreadPool.getMaxThreadStates();
final Set<String> newFilesSet = new HashSet<String>();
for (int i = 0; i < limit; i++) {
final ThreadState perThread = perThreadPool.getThreadState(i);
perThread.lock();
if (perThread.isActive()) { // we might be closed or
try {
perThread.dwpt.abort();
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
}
abortThreadState(perThread, newFilesSet);
}
deleteQueue.clear();
flushControl.abortPendingFlushes();
flushControl.abortPendingFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
flushControl.waitForFlush();
success = true;
} finally {
@ -271,12 +253,31 @@ final class DocumentsWriter {
}
if (!success) {
// if something happens here we unlock all states again
unlockAllAfterAbortAll();
unlockAllAfterAbortAll(indexWriter);
}
}
}
private final void abortThreadState(final ThreadState perThread, Set<String> newFiles) {
assert perThread.isHeldByCurrentThread();
if (perThread.isActive()) { // we might be closed
if (perThread.isInitialized()) {
try {
subtractFlushedNumDocs(perThread.dwpt.getNumDocsInRAM());
perThread.dwpt.abort(newFiles);
} finally {
perThread.dwpt.checkAndResetHasAborted();
flushControl.doOnAbort(perThread);
}
} else {
flushControl.doOnAbort(perThread);
}
} else {
assert closed;
}
}
final synchronized void unlockAllAfterAbortAll() {
final synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "unlockAll");
@ -334,7 +335,7 @@ final class DocumentsWriter {
private boolean preUpdate() throws IOException {
ensureOpen();
boolean maybeMerge = false;
boolean hasEvents = false;
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream.isEnabled("DW")) {
@ -345,7 +346,7 @@ final class DocumentsWriter {
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
maybeMerge |= doFlush(flushingDWPT);
hasEvents |= doFlush(flushingDWPT);
}
if (infoStream.isEnabled("DW")) {
@ -361,28 +362,35 @@ final class DocumentsWriter {
infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
}
}
return maybeMerge;
return hasEvents;
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean maybeMerge) throws IOException {
if (flushControl.doApplyAllDeletes()) {
applyAllDeletes(deleteQueue);
}
private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
maybeMerge |= doFlush(flushingDWPT);
hasEvents |= doFlush(flushingDWPT);
} else {
final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
if (nextPendingFlush != null) {
maybeMerge |= doFlush(nextPendingFlush);
hasEvents |= doFlush(nextPendingFlush);
}
}
return maybeMerge;
return hasEvents;
}
private final void ensureInitialized(ThreadState state) {
if (state.isActive() && state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(
writer.globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(writer.newSegmentName(),
directory, config, infoStream, deleteQueue, infos);
}
}
boolean updateDocuments(final Iterable<? extends IndexDocument> docs, final Analyzer analyzer,
final Term delTerm) throws IOException {
boolean maybeMerge = preUpdate();
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
@ -392,13 +400,19 @@ final class DocumentsWriter {
ensureOpen();
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
numDocsInRAM.addAndGet(docCount);
} finally {
if (dwpt.checkAndResetHasAborted()) {
if (!dwpt.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
}
subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@ -408,31 +422,35 @@ final class DocumentsWriter {
perThread.unlock();
}
return postUpdate(flushingDWPT, maybeMerge);
return postUpdate(flushingDWPT, hasEvents);
}
boolean updateDocument(final IndexDocument doc, final Analyzer analyzer,
final Term delTerm) throws IOException {
boolean maybeMerge = preUpdate();
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
try {
if (!perThread.isActive()) {
ensureOpen();
throw new IllegalStateException("perThread is not active but we are still open");
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
dwpt.updateDocument(doc, analyzer, delTerm);
numDocsInRAM.incrementAndGet();
} finally {
if (dwpt.checkAndResetHasAborted()) {
if (!dwpt.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(dwpt.pendingFilesToDelete()));
}
subtractFlushedNumDocs(dwptNumDocs);
flushControl.doOnAbort(perThread);
}
}
@ -442,13 +460,13 @@ final class DocumentsWriter {
perThread.unlock();
}
return postUpdate(flushingDWPT, maybeMerge);
return postUpdate(flushingDWPT, hasEvents);
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean maybeMerge = false;
boolean hasEvents = false;
while (flushingDWPT != null) {
maybeMerge = true;
hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
try {
@ -474,9 +492,24 @@ final class DocumentsWriter {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = ticketQueue.addFlushTicket(flushingDWPT);
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
hasEvents = true;
}
if (!dwptSuccess) {
putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
hasEvents = true;
}
}
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
@ -496,54 +529,38 @@ final class DocumentsWriter {
// thread in innerPurge can't keep up with all
// other threads flushing segments. In this case
// we forcefully stall the producers.
ticketQueue.forcePurge(this);
} else {
ticketQueue.tryPurge(this);
putEvent(ForcedPurgeEvent.INSTANCE);
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
indexWriter.flushCount.incrementAndGet();
indexWriter.doAfterFlush();
}
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
putEvent(MergePendingEvent.INSTANCE);
}
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
// tiny segments:
final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
}
applyAllDeletes(deleteQueue);
hasEvents = true;
if (!this.applyAllDeletes(deleteQueue)) {
putEvent(ApplyDeletesEvent.INSTANCE);
}
}
return maybeMerge;
return hasEvents;
}
void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
indexWriter.publishFrozenDeletes(bufferedDeletes);
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
}
}
} else {
publishFlushedSegment(newSegment, bufferedDeletes);
}
}
final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
@ -551,29 +568,6 @@ final class DocumentsWriter {
}
}
/**
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
throws IOException {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
//System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
}
if (segmentDeletes != null && infoStream.isEnabled("DW")) {
infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
}
// now publish!
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
}
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
@ -588,7 +582,7 @@ final class DocumentsWriter {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
final boolean flushAllThreads()
final boolean flushAllThreads(final IndexWriter indexWriter)
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
@ -620,10 +614,9 @@ final class DocumentsWriter {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
} else {
ticketQueue.forcePurge(this);
}
ticketQueue.addDeletes(flushingDeleteQueue);
}
ticketQueue.forcePurge(indexWriter);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
@ -641,11 +634,94 @@ final class DocumentsWriter {
// Release the flush lock
flushControl.finishFullFlush();
} else {
flushControl.abortFullFlushes();
Set<String> newFilesSet = new HashSet<>();
flushControl.abortFullFlushes(newFilesSet);
putEvent(new DeleteNewFilesEvent(newFilesSet));
}
} finally {
pendingChangesInCurrentFullFlush = false;
}
}
public LiveIndexWriterConfig getIndexWriterConfig() {
return config;
}
private void putEvent(Event event) {
events.add(event);
}
static final class ApplyDeletesEvent implements Event {
static final Event INSTANCE = new ApplyDeletesEvent();
private int instCount = 0;
private ApplyDeletesEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.applyDeletesAndPurge(true); // we always purge!
}
}
static final class MergePendingEvent implements Event {
static final Event INSTANCE = new MergePendingEvent();
private int instCount = 0;
private MergePendingEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
}
}
static final class ForcedPurgeEvent implements Event {
static final Event INSTANCE = new ForcedPurgeEvent();
private int instCount = 0;
private ForcedPurgeEvent() {
assert instCount == 0;
instCount++;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.purge(true);
}
}
static class FlushFailedEvent implements Event {
private final SegmentInfo info;
public FlushFailedEvent(SegmentInfo info) {
this.info = info;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.flushFailed(info);
}
}
static class DeleteNewFilesEvent implements Event {
private final Collection<String> files;
public DeleteNewFilesEvent(Collection<String> files) {
this.files = files;
}
@Override
public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
writer.deleteNewFiles(files);
}
}
public Queue<Event> eventQueue() {
return events;
}
}

View File

@ -23,9 +23,11 @@ import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@ -66,14 +68,18 @@ final class DocumentsWriterFlushControl {
private boolean closed = false;
private final DocumentsWriter documentsWriter;
private final LiveIndexWriterConfig config;
private final BufferedDeletesStream bufferedDeletesStream;
private final InfoStream infoStream;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config) {
DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedDeletesStream bufferedDeletesStream) {
this.infoStream = config.getInfoStream();
this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.config = config;
this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
this.documentsWriter = documentsWriter;
this.bufferedDeletesStream = bufferedDeletesStream;
}
public synchronized long activeBytes() {
@ -240,7 +246,6 @@ final class DocumentsWriterFlushControl {
}
public synchronized void waitForFlush() {
assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
while (flushingWriters.size() != 0) {
try {
this.wait();
@ -277,7 +282,7 @@ final class DocumentsWriterFlushControl {
}
assert assertMemory();
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
perThreadPool.reset(state, closed);
} finally {
updateStallState();
}
@ -295,7 +300,7 @@ final class DocumentsWriterFlushControl {
assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed;
dwpt = perThreadPool.replaceForFlush(perThread, closed);
dwpt = perThreadPool.reset(perThread, closed);
numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes));
}finally {
@ -311,12 +316,12 @@ final class DocumentsWriterFlushControl {
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
if (perThread.isActive()) {
if (perThread.isInitialized()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before
// replace!
dwpt = perThreadPool.replaceForFlush(perThread, closed);
dwpt = perThreadPool.reset(perThread, closed);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
@ -413,11 +418,11 @@ final class DocumentsWriterFlushControl {
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedDeletesStream.numTerms();
}
public long getDeleteBytesUsed() {
return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
return documentsWriter.deleteQueue.bytesUsed() + bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {
@ -441,7 +446,7 @@ final class DocumentsWriterFlushControl {
.currentThread(), documentsWriter);
boolean success = false;
try {
if (perThread.isActive()
if (perThread.isInitialized()
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
@ -475,7 +480,10 @@ final class DocumentsWriterFlushControl {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
if (!next.isActive()) {
if (!next.isInitialized()) {
if (closed && next.isActive()) {
perThreadPool.deactivateThreadState(next);
}
continue;
}
assert next.dwpt.deleteQueue == flushingQueue
@ -515,7 +523,7 @@ final class DocumentsWriterFlushControl {
final ThreadState next = perThreadPool.getThreadState(i);
next.lock();
try {
assert !next.isActive() || next.dwpt.deleteQueue == queue;
assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
} finally {
next.unlock();
}
@ -526,12 +534,12 @@ final class DocumentsWriterFlushControl {
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
void addFlushableState(ThreadState perThread) {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
}
final DocumentsWriterPerThread dwpt = perThread.dwpt;
assert perThread.isHeldByCurrentThread();
assert perThread.isActive();
assert perThread.isInitialized();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
@ -545,11 +553,7 @@ final class DocumentsWriterFlushControl {
fullFlushBuffer.add(flushingDWPT);
}
} else {
if (closed) {
perThreadPool.deactivateThreadState(perThread); // make this state inactive
} else {
perThreadPool.reinitThreadState(perThread);
}
perThreadPool.reset(perThread, closed); // make this state inactive
}
}
@ -594,19 +598,20 @@ final class DocumentsWriterFlushControl {
return true;
}
synchronized void abortFullFlushes() {
synchronized void abortFullFlushes(Set<String> newFiles) {
try {
abortPendingFlushes();
abortPendingFlushes(newFiles);
} finally {
fullFlush = false;
}
}
synchronized void abortPendingFlushes() {
synchronized void abortPendingFlushes(Set<String> newFiles) {
try {
for (DocumentsWriterPerThread dwpt : flushQueue) {
try {
dwpt.abort();
documentsWriter.subtractFlushedNumDocs(dwpt.getNumDocsInRAM());
dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
} finally {
@ -617,7 +622,8 @@ final class DocumentsWriterFlushControl {
try {
flushingWriters
.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
blockedFlush.dwpt.abort();
documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
blockedFlush.dwpt.abort(newFiles);
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
} finally {
@ -670,8 +676,8 @@ final class DocumentsWriterFlushControl {
* checked out DWPT are available
*/
void waitIfStalled() {
if (documentsWriter.infoStream.isEnabled("DWFC")) {
documentsWriter.infoStream.message("DWFC",
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC",
"waitIfStalled: numFlushesPending: " + flushQueue.size()
+ " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
+ " fullFlush: " + fullFlush);
@ -686,5 +692,12 @@ final class DocumentsWriterFlushControl {
return stallControl.anyStalledThreads();
}
/**
* Returns the {@link IndexWriter} {@link InfoStream}
*/
public InfoStream getInfoStream() {
return infoStream;
}
}

View File

@ -34,8 +34,7 @@ class DocumentsWriterFlushQueue {
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
void addDeletesAndPurge(DocumentsWriter writer,
DocumentsWriterDeleteQueue deleteQueue) throws IOException {
void addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
synchronized (this) {
incTickets();// first inc the ticket count - freeze opens
// a window for #anyChanges to fail
@ -49,9 +48,6 @@ class DocumentsWriterFlushQueue {
}
}
}
// don't hold the lock on the FlushQueue when forcing the purge - this blocks and deadlocks
// if we hold the lock.
forcePurge(writer);
}
private void incTickets() {
@ -98,8 +94,9 @@ class DocumentsWriterFlushQueue {
return ticketCount.get() != 0;
}
private void innerPurge(DocumentsWriter writer) throws IOException {
private int innerPurge(IndexWriter writer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
int numPurged = 0;
while (true) {
final FlushTicket head;
final boolean canPublish;
@ -108,6 +105,7 @@ class DocumentsWriterFlushQueue {
canPublish = head != null && head.canPublish(); // do this synced
}
if (canPublish) {
numPurged++;
try {
/*
* if we block on publish -> lock IW -> lock BufferedDeletes we don't block
@ -116,6 +114,7 @@ class DocumentsWriterFlushQueue {
* be a ticket still in the queue.
*/
head.publish(writer);
} finally {
synchronized (this) {
// finally remove the published ticket from the queue
@ -128,27 +127,31 @@ class DocumentsWriterFlushQueue {
break;
}
}
return numPurged;
}
void forcePurge(DocumentsWriter writer) throws IOException {
int forcePurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
purgeLock.lock();
try {
innerPurge(writer);
return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
void tryPurge(DocumentsWriter writer) throws IOException {
int tryPurge(IndexWriter writer) throws IOException {
assert !Thread.holdsLock(this);
assert !Thread.holdsLock(writer);
if (purgeLock.tryLock()) {
try {
innerPurge(writer);
return innerPurge(writer);
} finally {
purgeLock.unlock();
}
}
return 0;
}
public int getTicketCount() {
@ -169,8 +172,47 @@ class DocumentsWriterFlushQueue {
this.frozenDeletes = frozenDeletes;
}
protected abstract void publish(DocumentsWriter writer) throws IOException;
protected abstract void publish(IndexWriter writer) throws IOException;
protected abstract boolean canPublish();
/**
* Publishes the flushed segment, segment private deletes (if any) and its
* associated global delete (if present) to IndexWriter. The actual
* publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
* delete generation is always GlobalPacket_deleteGeneration + 1
*/
protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
throws IOException {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedDeletes segmentDeletes = newSegment.segmentDeletes;
//System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + segmentDeletes);
}
if (segmentDeletes != null && indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered seg private deletes: " + segmentDeletes);
}
// now publish!
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentDeletes, globalPacket);
}
protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
indexWriter.publishFrozenDeletes(bufferedDeletes);
if (indexWriter.infoStream.isEnabled("DW")) {
indexWriter.infoStream.message("DW", "flush: push buffered deletes: " + bufferedDeletes);
}
}
} else {
publishFlushedSegment(indexWriter, newSegment, bufferedDeletes);
}
}
}
static final class GlobalDeletesTicket extends FlushTicket {
@ -179,11 +221,11 @@ class DocumentsWriterFlushQueue {
super(frozenDeletes);
}
@Override
protected void publish(DocumentsWriter writer) throws IOException {
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
// its a global ticket - no segment to publish
writer.finishFlush(null, frozenDeletes);
finishFlush(writer, null, frozenDeletes);
}
@Override
@ -201,10 +243,10 @@ class DocumentsWriterFlushQueue {
}
@Override
protected void publish(DocumentsWriter writer) throws IOException {
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
published = true;
writer.finishFlush(segment, frozenDeletes);
finishFlush(writer, segment, frozenDeletes);
}
protected void setSegment(FlushedSegment segment) {

View File

@ -22,6 +22,7 @@ import java.text.NumberFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@ -144,7 +145,7 @@ class DocumentsWriterPerThread {
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() {
void abort(Set<String> createdFiles) {
//System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
hasAborted = aborting = true;
try {
@ -157,10 +158,7 @@ class DocumentsWriterPerThread {
}
pendingDeletes.clear();
deleteSlice = deleteQueue.newSlice();
// Reset all postings data
doAfterFlush();
createdFiles.addAll(directory.getCreatedFiles());
} finally {
aborting = false;
if (infoStream.isEnabled("DWPT")) {
@ -169,83 +167,77 @@ class DocumentsWriterPerThread {
}
}
private final static boolean INFO_VERBOSE = false;
final DocumentsWriter parent;
final Codec codec;
final IndexWriter writer;
final TrackingDirectoryWrapper directory;
final Directory directoryOrig;
final DocState docState;
final DocConsumer consumer;
final Counter bytesUsed;
SegmentWriteState flushState;
//Deletes for our still-in-RAM (to be flushed next) segment
BufferedDeletes pendingDeletes;
SegmentInfo segmentInfo; // Current segment we are working on
final BufferedDeletes pendingDeletes;
private final SegmentInfo segmentInfo; // Current segment we are working on
boolean aborting = false; // True if an abort is pending
boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
private FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
private int numDocsInRAM;
private int flushedDocCount;
DocumentsWriterDeleteQueue deleteQueue;
DeleteSlice deleteSlice;
final DocumentsWriterDeleteQueue deleteQueue;
private final DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
final Allocator byteBlockAllocator;
final IntBlockPool.Allocator intBlockAllocator;
private final LiveIndexWriterConfig indexWriterConfig;
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
FieldInfos.Builder fieldInfos, IndexingChain indexingChain) {
public DocumentsWriterPerThread(String segmentName, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos) {
this.directoryOrig = directory;
this.directory = new TrackingDirectoryWrapper(directory);
this.parent = parent;
this.fieldInfos = fieldInfos;
this.writer = parent.indexWriter;
this.indexWriterConfig = parent.indexWriterConfig;
this.infoStream = parent.infoStream;
this.codec = parent.codec;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = infoStream;
this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, infoStream);
this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
this.docState.similarity = indexWriterConfig.getSimilarity();
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingDeletes = new BufferedDeletes();
intBlockAllocator = new IntBlockAllocator(bytesUsed);
initialize();
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexingChain.getChain(this);
}
public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos.Builder fieldInfos) {
this(other.directoryOrig, other.parent, fieldInfos, other.parent.chain);
}
void initialize() {
deleteQueue = parent.deleteQueue;
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingDeletes.clear();
deleteSlice = null;
}
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1,
false, codec, null, null);
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
}
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexWriterConfig.getIndexingChain().getChain(this);
}
void setAborting() {
aborting = true;
}
final boolean testPoint(String message) {
if (infoStream.isEnabled("TP")) {
infoStream.message("TP", message);
}
return true;
}
boolean checkAndResetHasAborted() {
final boolean retval = hasAborted;
hasAborted = false;
return retval;
}
final boolean testPoint(String message) {
if (infoStream.isEnabled("TP")) {
infoStream.message("TP", message);
}
return true;
}
public void updateDocument(IndexDocument doc, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocument start");
@ -253,9 +245,6 @@ class DocumentsWriterPerThread {
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (segmentInfo == null) {
initSegmentInfo();
}
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@ -274,7 +263,7 @@ class DocumentsWriterPerThread {
deleteDocID(docState.docID);
numDocsInRAM++;
} else {
abort();
abort(filesToDelete);
}
}
}
@ -284,29 +273,16 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
abort();
abort(filesToDelete);
}
}
finishDocument(delTerm);
}
private void initSegmentInfo() {
String segment = writer.newSegmentName();
segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segment, -1,
false, codec, null, null);
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
}
}
public int updateDocuments(Iterable<? extends IndexDocument> docs, Analyzer analyzer, Term delTerm) throws IOException {
assert testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (segmentInfo == null) {
initSegmentInfo();
}
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
@ -331,7 +307,7 @@ class DocumentsWriterPerThread {
// be called (because an exc is being thrown):
numDocsInRAM++;
} else {
abort();
abort(filesToDelete);
}
}
}
@ -341,7 +317,7 @@ class DocumentsWriterPerThread {
success = true;
} finally {
if (!success) {
abort();
abort(filesToDelete);
}
}
@ -384,21 +360,18 @@ class DocumentsWriterPerThread {
* the updated slice we get from 1. holds all the deletes that have occurred
* since we updated the slice the last time.
*/
if (deleteSlice == null) {
deleteSlice = deleteQueue.newSlice();
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
deleteSlice.reset();
}
} else {
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else if (deleteQueue.updateSlice(deleteSlice)) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
}
boolean applySlice = numDocsInRAM != 0;
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
}
if (applySlice) {
deleteSlice.apply(pendingDeletes, numDocsInRAM);
} else { // if we don't need to apply we must reset!
deleteSlice.reset();
}
++numDocsInRAM;
}
@ -434,14 +407,6 @@ class DocumentsWriterPerThread {
return numDocsInRAM;
}
/** Reset after a flush */
private void doAfterFlush() {
segmentInfo = null;
directory.getCreatedFiles().clear();
fieldInfos = new FieldInfos.Builder(fieldInfos.globalFieldNumbers);
parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
/**
* Prepares this DWPT for flushing. This method will freeze and return the
@ -457,7 +422,7 @@ class DocumentsWriterPerThread {
// apply all deletes before we flush and release the delete slice
deleteSlice.apply(pendingDeletes, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice = null;
deleteSlice.reset();
}
return globalDeletes;
}
@ -465,11 +430,11 @@ class DocumentsWriterPerThread {
/** Flush all pending docs to a new segment */
FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice == null : "all deletes must be applied in prepareFlush";
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setDocCount(numDocsInRAM);
flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingDeletes, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = parent.flushControl.netBytes() / 1024. / 1024.;
final double startMBUsed = bytesUsed() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
@ -515,15 +480,11 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed codec=" + codec);
}
flushedDocCount += flushState.segmentInfo.getDocCount();
final BufferedDeletes segmentDeletes;
if (pendingDeletes.queries.isEmpty()) {
pendingDeletes.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
pendingDeletes = new BufferedDeletes();
}
if (infoStream.isEnabled("DWPT")) {
@ -531,7 +492,7 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize(includes docstores)=" + nf.format(newSegmentSize) + " MB" +
" docs/MB=" + nf.format(flushedDocCount / newSegmentSize));
" docs/MB=" + nf.format(flushState.segmentInfo.getDocCount() / newSegmentSize));
}
assert segmentInfo != null;
@ -539,20 +500,21 @@ class DocumentsWriterPerThread {
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
doAfterFlush();
success = true;
return fs;
} finally {
if (!success) {
if (segmentInfo != null) {
writer.flushFailed(segmentInfo);
}
abort();
abort(filesToDelete);
}
}
}
private final Set<String> filesToDelete = new HashSet<String>();
public Set<String> pendingFilesToDelete() {
return filesToDelete;
}
/**
* Seals the {@link SegmentInfo} for the new flushed segment and persists
* the deleted documents {@link MutableBits}.
@ -568,12 +530,10 @@ class DocumentsWriterPerThread {
boolean success = false;
try {
if (indexWriterConfig.getUseCompoundFile()) {
// Now build compound file
Collection<String> oldFiles = IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context);
filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
newSegment.info.setUseCompoundFile(true);
writer.deleteNewFiles(oldFiles);
}
// Have codec write SegmentInfo. Must do this after
@ -618,7 +578,6 @@ class DocumentsWriterPerThread {
infoStream.message("DWPT", "hit exception " +
"reating compound file for newly flushed segment " + newSegment.info.name);
}
writer.flushFailed(newSegment.info);
}
}
}
@ -671,4 +630,5 @@ class DocumentsWriterPerThread {
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborting=" + aborting + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
}
}

View File

@ -71,12 +71,16 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
* for indexing anymore.
* @see #isActive()
*/
private void resetWriter(DocumentsWriterPerThread dwpt) {
private void deactivate() {
assert this.isHeldByCurrentThread();
if (dwpt == null) {
isActive = false;
}
this.dwpt = dwpt;
isActive = false;
reset();
}
private void reset() {
assert this.isHeldByCurrentThread();
this.dwpt = null;
this.bytesUsed = 0;
this.flushPending = false;
}
@ -91,6 +95,11 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
return isActive;
}
boolean isInitialized() {
assert this.isHeldByCurrentThread();
return isActive() && dwpt != null;
}
/**
* Returns the number of currently active bytes in this ThreadState's
* {@link DocumentsWriterPerThread}
@ -121,9 +130,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
private ThreadState[] threadStates;
private volatile int numThreadStatesActive;
private SetOnce<FieldNumbers> globalFieldMap = new SetOnce<FieldNumbers>();
private SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
@ -133,14 +140,8 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
}
threadStates = new ThreadState[maxNumThreadStates];
numThreadStatesActive = 0;
}
void initialize(DocumentsWriter documentsWriter, FieldNumbers globalFieldMap, LiveIndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
this.globalFieldMap.set(globalFieldMap);
for (int i = 0; i < threadStates.length; i++) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap);
threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
threadStates[i] = new ThreadState(null);
}
}
@ -158,9 +159,10 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
// should not happen
throw new RuntimeException(e);
}
clone.documentsWriter = new SetOnce<DocumentsWriter>();
clone.globalFieldMap = new SetOnce<FieldNumbers>();
clone.threadStates = new ThreadState[threadStates.length];
for (int i = 0; i < threadStates.length; i++) {
clone.threadStates[i] = new ThreadState(null);
}
return clone;
}
@ -178,6 +180,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
int getActiveThreadState() {
return numThreadStatesActive;
}
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
@ -198,8 +201,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
assert threadState.dwpt != null;
threadState.dwpt.initialize();
assert threadState.dwpt == null;
unlock = false;
return threadState;
}
@ -220,7 +222,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
try {
assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
assert !threadStates[i].isInitialized() : "expected unreleased thread state to be inactive";
} finally {
threadStates[i].unlock();
}
@ -236,24 +238,20 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
final ThreadState threadState = threadStates[i];
threadState.lock();
try {
threadState.resetWriter(null);
threadState.deactivate();
} finally {
threadState.unlock();
}
}
}
DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread();
assert globalFieldMap.get() != null;
final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldMap.get());
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
newDwpt.initialize();
threadState.resetWriter(newDwpt);
threadState.reset();
} else {
threadState.resetWriter(null);
threadState.deactivate();
}
return dwpt;
}
@ -328,18 +326,6 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
*/
void deactivateThreadState(ThreadState threadState) {
assert threadState.isActive();
threadState.resetWriter(null);
}
/**
* Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
* only be reinitialized if it is active without any pending documents.
*
* @param threadState the state to reinitialize
*/
void reinitThreadState(ThreadState threadState) {
assert threadState.isActive;
assert threadState.dwpt.getNumDocsInRAM() == 0;
threadState.dwpt.initialize();
threadState.deactivate();
}
}

View File

@ -68,12 +68,11 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
control.setApplyAllDeletes();
}
}
final DocumentsWriter writer = this.writer.get();
if ((flushOnRAM() &&
control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) {
control.setApplyAllDeletes();
if (writer.infoStream.isEnabled("FP")) {
writer.infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
}
}
}
@ -89,9 +88,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
if (totalRam >= limit) {
final DocumentsWriter writer = this.writer.get();
if (writer.infoStream.isEnabled("FP")) {
writer.infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
}
markLargestWriterPending(control, state, totalRam);
}

View File

@ -20,6 +20,7 @@ import java.util.Iterator;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SetOnce;
/**
@ -52,8 +53,8 @@ import org.apache.lucene.util.SetOnce;
* @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
*/
abstract class FlushPolicy implements Cloneable {
protected SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>();
protected LiveIndexWriterConfig indexWriterConfig;
protected InfoStream infoStream;
/**
* Called for each delete term. If this is a delete triggered due to an update
@ -93,9 +94,9 @@ abstract class FlushPolicy implements Cloneable {
/**
* Called by DocumentsWriter to initialize the FlushPolicy
*/
protected synchronized void init(DocumentsWriter docsWriter) {
writer.set(docsWriter);
indexWriterConfig = docsWriter.indexWriter.getConfig();
protected synchronized void init(LiveIndexWriterConfig indexWriterConfig) {
this.indexWriterConfig = indexWriterConfig;
infoStream = indexWriterConfig.getInfoStream();
}
/**
@ -127,8 +128,8 @@ abstract class FlushPolicy implements Cloneable {
}
private boolean assertMessage(String s) {
if (writer.get().infoStream.isEnabled("FP")) {
writer.get().infoStream.message("FP", s);
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", s);
}
return true;
}
@ -142,8 +143,8 @@ abstract class FlushPolicy implements Cloneable {
// should not happen
throw new RuntimeException(e);
}
clone.writer = new SetOnce<DocumentsWriter>();
clone.indexWriterConfig = null;
clone.infoStream = null;
return clone;
}
}

View File

@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -182,7 +183,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable, TwoPhaseCommit {
public class IndexWriter implements Closeable, TwoPhaseCommit{
private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
@ -227,6 +228,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
final FieldNumbers globalFieldNumberMap;
private DocumentsWriter docWriter;
private final Queue<Event> eventQueue;
final IndexFileDeleter deleter;
// used by forceMerge to note those needing merging
@ -360,7 +362,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized (fullFlushLock) {
boolean success = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
anySegmentFlushed = docWriter.flushAllThreads(this);
if (!anySegmentFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@ -730,7 +732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// start with previous field numbers, but new FieldInfos
globalFieldNumberMap = getFieldNumberMap();
docWriter = new DocumentsWriter(codec, config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
config.getFlushPolicy().init(config);
docWriter = new DocumentsWriter(this, config, directory);
eventQueue = docWriter.eventQueue();
// Default deleter (for backwards compatibility) is
// KeepOnlyLastCommitDeleter:
@ -961,7 +965,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
if (doFlush) {
flush(waitForMerges, true);
} else {
docWriter.abort(); // already closed -- never sync on IW
docWriter.abort(this); // already closed -- never sync on IW
}
} finally {
@ -1033,7 +1037,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized(this) {
closed = true;
}
assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates() : "" + oldWriter.perThreadPool.numDeactivatedThreadStates() + " " + oldWriter.perThreadPool.getMaxThreadStates();
} catch (OutOfMemoryError oom) {
handleOOM(oom, "closeInternal");
} finally {
@ -1280,9 +1284,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
ensureOpen();
try {
boolean success = false;
boolean anySegmentFlushed = false;
try {
anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm);
if (docWriter.updateDocuments(docs, analyzer, delTerm)) {
processEvents(true, false);
}
success = true;
} finally {
if (!success) {
@ -1291,9 +1296,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
if (anySegmentFlushed) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocuments");
}
@ -1313,7 +1315,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Term term) throws IOException {
ensureOpen();
try {
docWriter.deleteTerms(term);
if (docWriter.deleteTerms(term)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@ -1412,7 +1416,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Term... terms) throws IOException {
ensureOpen();
try {
docWriter.deleteTerms(terms);
if (docWriter.deleteTerms(terms)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@ -1432,7 +1438,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Query query) throws IOException {
ensureOpen();
try {
docWriter.deleteQueries(query);
if (docWriter.deleteQueries(query)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query)");
}
@ -1454,7 +1462,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
public void deleteDocuments(Query... queries) throws IOException {
ensureOpen();
try {
docWriter.deleteQueries(queries);
if (docWriter.deleteQueries(queries)) {
processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query..)");
}
@ -1505,9 +1515,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
ensureOpen();
try {
boolean success = false;
boolean anySegmentFlushed = false;
try {
anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
if (docWriter.updateDocument(doc, analyzer, term)) {
processEvents(true, false);
}
success = true;
} finally {
if (!success) {
@ -1516,10 +1527,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
if (anySegmentFlushed) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
}
@ -1730,7 +1737,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// complete
ensureOpen();
}
// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
@ -2009,8 +2015,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
mergeScheduler.close();
bufferedDeletesStream.clear();
processEvents(false, true);
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort(); // don't sync on IW here
docWriter.abort(this); // don't sync on IW here
synchronized(this) {
if (pendingCommit != null) {
@ -2102,7 +2109,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* sure it's just like a fresh index.
*/
try {
docWriter.lockAndAbortAll();
docWriter.lockAndAbortAll(this);
processEvents(false, true);
synchronized (this) {
try {
// Abort any running merges
@ -2135,7 +2143,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
} finally {
docWriter.unlockAllAfterAbortAll();
docWriter.unlockAllAfterAbortAll(this);
}
}
}
@ -2243,33 +2251,40 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment,
void publishFlushedSegment(SegmentInfoPerCommit newSegment,
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment");
try {
synchronized (this) {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publishFlushedSegment");
}
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
// Publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
}
}
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
// Publishing the segment must be synched on IW -> BDS to make the sure
// that no merge prunes away the seg. private delete packet
final long nextGen;
if (packet != null && packet.any()) {
nextGen = bufferedDeletesStream.push(packet);
} else {
// Since we don't have a delete packet to apply we can get a new
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
} finally {
flushCount.incrementAndGet();
doAfterFlush();
}
}
@ -2705,12 +2720,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
anySegmentsFlushed = docWriter.flushAllThreads(this);
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
processEvents(false, true);
flushSuccess = true;
synchronized(this) {
@ -2750,7 +2766,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
boolean success = false;
try {
if (anySegmentsFlushed) {
@ -2765,7 +2781,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
}
}
}
startCommit(toCommit);
}
}
@ -2950,10 +2966,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
anySegmentFlushed = docWriter.flushAllThreads(this);
flushSuccess = true;
} finally {
docWriter.finishFullFlush(flushSuccess);
processEvents(false, true);
}
}
synchronized(this) {
@ -4307,4 +4324,65 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized final void flushFailed(SegmentInfo info) throws IOException {
deleter.refresh(info.name);
}
final int purge(boolean forced) throws IOException {
return docWriter.purgeBuffer(this, forced);
}
final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
applyAllDeletes();
flushCount.incrementAndGet();
}
}
final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
try {
purge(forcePurge);
} finally {
if (triggerMerge) {
maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
}
private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
return processEvents(eventQueue, triggerMerge, forcePurge);
}
private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
Event event;
boolean processed = false;
while((event = queue.poll()) != null) {
processed = true;
event.process(this, triggerMerge, forcePurge);
}
return processed;
}
/**
* Interface for internal atomic events. See {@link DocumentsWriter} for details. Events are executed concurrently and no order is guaranteed.
* Each event should only rely on the serializeability within it's process method. All actions that must happen before or after a certain action must be
* encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
*
*/
static interface Event {
/**
* Processes the event. This method is called by the {@link IndexWriter}
* passed as the first argument.
*
* @param writer
* the {@link IndexWriter} that executes the event.
* @param triggerMerge
* <code>false</code> iff this event should not trigger any segment merges
* @param clearBuffers
* <code>true</code> iff this event should clear all buffers associated with the event.
* @throws IOException
* if an {@link IOException} occurs
*/
void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException;
}
}

View File

@ -281,7 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
long bytesUsed = 0;
while (allActiveThreads.hasNext()) {
bytesUsed += allActiveThreads.next().dwpt.bytesUsed();
ThreadState next = allActiveThreads.next();
if (next.dwpt != null) {
bytesUsed += next.dwpt.bytesUsed();
}
}
assertEquals(bytesUsed, flushControl.activeBytes());
}

View File

@ -1702,7 +1702,6 @@ public class TestIndexWriter extends LuceneTestCase {
w.deleteAll();
w.commit();
// Make sure we accumulate no files except for empty
// segments_N and segments.gen:
assertTrue(d.listAll().length <= 2);

View File

@ -83,7 +83,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random()))
.setMaxBufferedDocs(2).setMergePolicy(ldmp).setMergeScheduler(new ConcurrentMergeScheduler()));
for(int iter=0;iter<10;iter++) {
for(int i=0;i<19;i++)
writer.addDocument(doc);
@ -96,7 +96,6 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
sis.read(dir);
final int segCount = sis.size();
writer.forceMerge(7);
writer.commit();
writer.waitForMerges();
@ -108,7 +107,7 @@ public class TestIndexWriterForceMerge extends LuceneTestCase {
if (segCount < 7)
assertEquals(segCount, optSegCount);
else
assertEquals(7, optSegCount);
assertEquals("seg: " + segCount, 7, optSegCount);
}
writer.close();
dir.close();