diff --git a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java index 3aa00d9d5a0..1a61cfc0f74 100644 --- a/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -1,42 +1,15 @@ package org.apache.lucene.index; -import java.io.IOException; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DocumentsWriterThreadPool.ThreadState; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.util.BytesRef; - /** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,312 +17,662 @@ import org.apache.lucene.util.BytesRef; * limitations under the License. */ +import java.io.IOException; +import java.io.PrintStream; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.codecs.Codec; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Similarity; +import org.apache.lucene.search.Weight; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMFile; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Constants; +import org.apache.lucene.util.ThreadInterruptedException; +import org.apache.lucene.util.RamUsageEstimator; + +/** + * This class accepts multiple added documents and directly + * writes a single segment file. It does this more + * efficiently than creating a single segment per document + * (with DocumentWriter) and doing standard merges on those + * segments. + * + * Each added document is passed to the {@link DocConsumer}, + * which in turn processes the document and interacts with + * other consumers in the indexing chain. Certain + * consumers, like {@link StoredFieldsWriter} and {@link + * TermVectorsTermsWriter}, digest a document and + * immediately write bytes to the "doc store" files (ie, + * they do not consume RAM per document, except while they + * are processing the document). + * + * Other consumers, eg {@link FreqProxTermsWriter} and + * {@link NormsWriter}, buffer bytes in RAM and flush only + * when a new segment is produced. + + * Once we have used our allowed RAM buffer, or the number + * of added docs is large enough (in the case we are + * flushing by doc count instead of RAM usage), we create a + * real segment and flush it to the Directory. + * + * Threads: + * + * Multiple threads are allowed into addDocument at once. + * There is an initial synchronized call to getThreadState + * which allocates a ThreadState for this thread. The same + * thread will get the same ThreadState over time (thread + * affinity) so that if there are consistent patterns (for + * example each thread is indexing a different content + * source) then we make better use of RAM. Then + * processDocument is called on that ThreadState without + * synchronization (most of the "heavy lifting" is in this + * call). Finally the synchronized "finishDocument" is + * called to flush changes to the directory. + * + * When flush is called by IndexWriter we forcefully idle + * all threads and flush only once they are all idle. This + * means you can call flush with a given thread even while + * other threads are actively adding/deleting documents. + * + * + * Exceptions: + * + * Because this class directly updates in-memory posting + * lists, and flushes stored fields and term vectors + * directly to files in the directory, there are certain + * limited times when an exception can corrupt this state. + * For example, a disk full while flushing stored fields + * leaves this file in a corrupt state. Or, an OOM + * exception while appending to the in-memory posting lists + * can corrupt that posting list. We call such exceptions + * "aborting exceptions". In these cases we must call + * abort() to discard all docs added since the last flush. + * + * All other exceptions ("non-aborting exceptions") can + * still partially update the index structures. These + * updates are consistent, but, they represent only a part + * of the document seen up until the exception was hit. + * When this happens, we immediately mark the document as + * deleted so that the document is always atomically ("all + * or none") added to the index. + */ + final class DocumentsWriter { - private long sequenceID; - private int numDocumentsWriterPerThreads; - private final BufferedDeletesInRAM deletesInRAM = new BufferedDeletesInRAM(); - private final DocumentsWriterThreadPool threadPool; - private final Lock sequenceIDLock = new ReentrantLock(); + IndexWriter writer; + Directory directory; - private final Directory directory; - final IndexWriter indexWriter; - final IndexWriterConfig config; + String segment; // Current segment we are working on + private String docStoreSegment; // Current doc-store segment we are writing + private int docStoreOffset; // Current starting doc-store offset of current segment - private int maxBufferedDocs; - private double maxBufferSizeMB; - private int maxBufferedDeleteTerms; + private int nextDocID; // Next docID to be added + private int numDocsInRAM; // # docs buffered in RAM + int numDocsInStore; // # docs written to doc stores - private boolean closed; - private AtomicInteger numDocsInRAM = new AtomicInteger(0); - private AtomicLong ramUsed = new AtomicLong(0); + // Max # ThreadState instances; if there are more threads + // than this they share ThreadStates + private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; + private final HashMap threadBindings = new HashMap(); - private long flushedSequenceID = -1; - private final PrintStream infoStream; + private int pauseThreads; // Non-zero when we need all threads to + // pause (eg to flush) + boolean flushPending; // True when a thread has decided to flush + boolean bufferIsFull; // True when it's time to write segment + private boolean aborting; // True if an abort is pending - private Map minSequenceIDsPerThread = new HashMap(); + private DocFieldProcessor docFieldProcessor; - public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) { - this.directory = directory; - this.indexWriter = indexWriter; - this.config = config; - this.maxBufferedDocs = config.getMaxBufferedDocs(); - this.threadPool = config.getIndexerThreadPool(); - this.infoStream = indexWriter.getInfoStream(); + PrintStream infoStream; + int maxFieldLength = IndexWriterConfig.UNLIMITED_FIELD_LENGTH; + Similarity similarity; + + // max # simultaneous threads; if there are more than + // this, they wait for others to finish first + private final int maxThreadStates; + + List newFiles; + + static class DocState { + DocumentsWriter docWriter; + Analyzer analyzer; + int maxFieldLength; + PrintStream infoStream; + Similarity similarity; + int docID; + Document doc; + String maxTermPrefix; + + // Only called by asserts + public boolean testPoint(String name) { + return docWriter.writer.testPoint(name); + } + + public void clear() { + // don't hold onto doc nor analyzer, in case it is + // largish: + doc = null; + analyzer = null; + } } - public int getMaxBufferedDocs() { + /** Consumer returns this on each doc. This holds any + * state that must be flushed synchronized "in docID + * order". We gather these and flush them in order. */ + abstract static class DocWriter { + DocWriter next; + int docID; + abstract void finish() throws IOException; + abstract void abort(); + abstract long sizeInBytes(); + + void setNext(DocWriter next) { + this.next = next; + } + } + + /** + * Create and return a new DocWriterBuffer. + */ + PerDocBuffer newPerDocBuffer() { + return new PerDocBuffer(); + } + + /** + * RAMFile buffer for DocWriters. + */ + class PerDocBuffer extends RAMFile { + + /** + * Allocate bytes used from shared pool. + */ + protected byte[] newBuffer(int size) { + assert size == PER_DOC_BLOCK_SIZE; + return perDocAllocator.getByteBlock(); + } + + /** + * Recycle the bytes used. + */ + synchronized void recycle() { + if (buffers.size() > 0) { + setLength(0); + + // Recycle the blocks + perDocAllocator.recycleByteBlocks(buffers); + buffers.clear(); + sizeInBytes = 0; + + assert numBuffers() == 0; + } + } + } + + /** + * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method + * which returns the DocConsumer that the DocumentsWriter calls to process the + * documents. + */ + abstract static class IndexingChain { + abstract DocConsumer getChain(DocumentsWriter documentsWriter); + } + + static final IndexingChain defaultIndexingChain = new IndexingChain() { + + @Override + DocConsumer getChain(DocumentsWriter documentsWriter) { + /* + This is the current indexing chain: + + DocConsumer / DocConsumerPerThread + --> code: DocFieldProcessor / DocFieldProcessorPerThread + --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField + --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField + --> code: DocInverter / DocInverterPerThread / DocInverterPerField + --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField + --> code: TermsHash / TermsHashPerThread / TermsHashPerField + --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField + --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField + --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField + --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField + --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField + --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField + */ + + // Build up indexing chain: + + final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter); + final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter(); + + final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, + new TermsHash(documentsWriter, false, termVectorsWriter, null)); + final NormsWriter normsWriter = new NormsWriter(); + final DocInverter docInverter = new DocInverter(termsHash, normsWriter); + return new DocFieldProcessor(documentsWriter, docInverter); + } + }; + + final DocConsumer consumer; + + // Deletes done after the last flush; these are discarded + // on abort + private BufferedDeletes deletesInRAM = new BufferedDeletes(false); + + // Deletes done before the last flush; these are still + // kept on abort + private BufferedDeletes deletesFlushed = new BufferedDeletes(true); + + // The max number of delete terms that can be buffered before + // they must be flushed to disk. + private int maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS; + + // How much RAM we can use before flushing. This is 0 if + // we are flushing by doc count instead. + private long ramBufferSize = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); + private long waitQueuePauseBytes = (long) (ramBufferSize*0.1); + private long waitQueueResumeBytes = (long) (ramBufferSize*0.05); + + // If we've allocated 5% over our RAM budget, we then + // free down to 95% + private long freeLevel = (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95); + + // Flush @ this number of docs. If ramBufferSize is + // non-zero we will flush by RAM usage instead. + private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS; + + private int flushedDocCount; // How many docs already flushed to index + + synchronized void updateFlushedDocCount(int n) { + flushedDocCount += n; + } + synchronized int getFlushedDocCount() { + return flushedDocCount; + } + synchronized void setFlushedDocCount(int n) { + flushedDocCount = n; + } + + private boolean closed; + + DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain, int maxThreadStates) throws IOException { + this.directory = directory; + this.writer = writer; + this.similarity = writer.getConfig().getSimilarity(); + this.maxThreadStates = maxThreadStates; + flushedDocCount = writer.maxDoc(); + + consumer = indexingChain.getChain(this); + if (consumer instanceof DocFieldProcessor) { + docFieldProcessor = (DocFieldProcessor) consumer; + } + } + + /** Returns true if any of the fields in the current + * buffered docs have omitTermFreqAndPositions==false */ + boolean hasProx() { + return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx() + : true; + } + + /** If non-null, various details of indexing are printed + * here. */ + synchronized void setInfoStream(PrintStream infoStream) { + this.infoStream = infoStream; + for(int i=0;i openFiles = new ArrayList(); + final List closedFiles = new ArrayList(); + + /* Returns Collection of files in use by this instance, + * including any flushed segments. */ + @SuppressWarnings("unchecked") + synchronized List openFiles() { + return (List) ((ArrayList) openFiles).clone(); + } + + @SuppressWarnings("unchecked") + synchronized List closedFiles() { + return (List) ((ArrayList) closedFiles).clone(); + } + + synchronized void addOpenFile(String name) { + assert !openFiles.contains(name); + openFiles.add(name); + } + + synchronized void removeOpenFile(String name) { + assert openFiles.contains(name); + openFiles.remove(name); + closedFiles.add(name); + } + + synchronized void setAborting() { + aborting = true; + } + + /** Called if we hit an exception at a bad time (when + * updating the index files) and must discard all + * currently buffered docs. This resets our state, + * discarding any docs added since last flush. */ + synchronized void abort() throws IOException { + + try { + if (infoStream != null) { + message("docWriter: now abort"); + } + + // Forcefully remove waiting ThreadStates from line + waitQueue.abort(); + + // Wait for all other threads to finish with + // DocumentsWriter: + pauseAllThreads(); + + try { + + assert 0 == waitQueue.numWaiting; + + waitQueue.waitingBytes = 0; + + try { + abortedFiles = openFiles(); + } catch (Throwable t) { + abortedFiles = null; + } + + deletesInRAM.clear(); + deletesFlushed.clear(); + + openFiles.clear(); + + for(int i=0;i= 0; + if (0 == pauseThreads) + notifyAll(); + } + + private synchronized boolean allThreadsIdle() { + for(int i=0;i 0; - long updateDocument(final Term delTerm, final Document doc, final Analyzer analyzer) - throws CorruptIndexException, IOException { + assert nextDocID == numDocsInRAM; + assert waitQueue.numWaiting == 0; + assert waitQueue.waitingBytes == 0; - return threadPool.executePerThread(this, doc, - new DocumentsWriterThreadPool.PerThreadTask() { - @Override - public Long process(final DocumentsWriterPerThread perThread) throws IOException { - long perThreadRAMUsedBeforeAdd = perThread.numBytesUsed; - perThread.addDocument(doc, analyzer); + initFlushState(false); - final long sequenceID; - sequenceIDLock.lock(); - try { - ensureOpen(); - sequenceID = nextSequenceID(); - if (delTerm != null) { - deletesInRAM.addDeleteTerm(delTerm, sequenceID, numDocumentsWriterPerThreads); - } - perThread.commitDocument(sequenceID); - if (!minSequenceIDsPerThread.containsKey(perThread)) { - minSequenceIDsPerThread.put(perThread, sequenceID); - } - numDocsInRAM.incrementAndGet(); - } finally { - sequenceIDLock.unlock(); - } + docStoreOffset = numDocsInStore; - if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) { - super.clearThreadBindings(); - indexWriter.maybeMerge(); - } - return sequenceID; - } - }); - } - - private final boolean finishAddDocument(DocumentsWriterPerThread perThread, - long perThreadRAMUsedBeforeAdd) throws IOException { - int numDocsPerThread = perThread.getNumDocsInRAM(); - boolean flushed = maybeFlushPerThread(perThread); - if (flushed) { - int oldValue = numDocsInRAM.get(); - while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numDocsPerThread)) { - oldValue = numDocsInRAM.get(); - } - - sequenceIDLock.lock(); - try { - minSequenceIDsPerThread.remove(perThread); - updateFlushedSequenceID(); - } finally { - sequenceIDLock.unlock(); - } - } - - long deltaRAM = perThread.numBytesUsed - perThreadRAMUsedBeforeAdd; - long oldValue = ramUsed.get(); - while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) { - oldValue = ramUsed.get(); - } - - return flushed; - } - - long bufferDeleteTerms(final Term[] terms) throws IOException { - sequenceIDLock.lock(); - try { - ensureOpen(); - final long sequenceID = nextSequenceID(); - deletesInRAM.addDeleteTerms(terms, sequenceID, numDocumentsWriterPerThreads); - return sequenceID; - } finally { - sequenceIDLock.unlock(); - } - } - - long bufferDeleteTerm(final Term term) throws IOException { - sequenceIDLock.lock(); - try { - ensureOpen(); - final long sequenceID = nextSequenceID(); - deletesInRAM.addDeleteTerm(term, sequenceID, numDocumentsWriterPerThreads); - return sequenceID; - } finally { - sequenceIDLock.unlock(); - } - } - - long bufferDeleteQueries(final Query[] queries) throws IOException { - sequenceIDLock.lock(); - try { - ensureOpen(); - final long sequenceID = nextSequenceID(); - for (Query q : queries) { - deletesInRAM.addDeleteQuery(q, sequenceID, numDocumentsWriterPerThreads); - } - return sequenceID; - } finally { - sequenceIDLock.unlock(); - } - } - - long bufferDeleteQuery(final Query query) throws IOException { - sequenceIDLock.lock(); - try { - ensureOpen(); - final long sequenceID = nextSequenceID(); - deletesInRAM.addDeleteQuery(query, sequenceID, numDocumentsWriterPerThreads); - return sequenceID; - } finally { - sequenceIDLock.unlock(); - } - } - - private final void updateFlushedSequenceID() { - long newFlushedID = Long.MAX_VALUE; - for (long minSeqIDPerThread : minSequenceIDsPerThread.values()) { - if (minSeqIDPerThread < newFlushedID) { - newFlushedID = minSeqIDPerThread; - } - } - - this.flushedSequenceID = newFlushedID; - } - - final boolean flushAllThreads(final boolean flushDocStores, final boolean flushDeletes) - throws IOException { - return threadPool.executeAllThreads(new DocumentsWriterThreadPool.AllThreadsTask() { - @Override - public Boolean process(Iterator threadsIterator) throws IOException { - boolean anythingFlushed = false; - - if (flushDeletes) { - synchronized (indexWriter) { - if (applyDeletes(indexWriter.segmentInfos)) { - indexWriter.checkpoint(); - } - } - } - - while (threadsIterator.hasNext()) { - boolean perThreadFlushDocStores = flushDocStores; - DocumentsWriterPerThread perThread = threadsIterator.next(); - final int numDocs = perThread.getNumDocsInRAM(); - - // Always flush docs if there are any - boolean flushDocs = numDocs > 0; - - String docStoreSegment = perThread.getDocStoreSegment(); - if (docStoreSegment == null) { - perThreadFlushDocStores = false; - } - int docStoreOffset = perThread.getDocStoreOffset(); - boolean docStoreIsCompoundFile = false; - if (perThreadFlushDocStores - && (!flushDocs || !perThread.getSegment().equals(perThread.getDocStoreSegment()))) { - // We must separately flush the doc store - if (infoStream != null) { - message(" flush shared docStore segment " + docStoreSegment); - } - docStoreIsCompoundFile = flushDocStores(perThread); - flushDocStores(perThread); - perThreadFlushDocStores = false; - } - - String segment = perThread.getSegment(); - - // If we are flushing docs, segment must not be null: - assert segment != null || !flushDocs; + if (infoStream != null) + message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); - if (flushDocs) { - SegmentInfo newSegment = perThread.flush(perThreadFlushDocStores); - - if (newSegment != null) { - anythingFlushed = true; - - if (0 == docStoreOffset && perThreadFlushDocStores) { - // This means we are flushing private doc stores - // with this segment, so it will not be shared - // with other segments - assert docStoreSegment != null; - assert docStoreSegment.equals(segment); - docStoreOffset = -1; - docStoreSegment = null; - docStoreIsCompoundFile = false; - } - newSegment.setDocStore(docStoreOffset, docStoreSegment, docStoreIsCompoundFile); - - IndexWriter.setDiagnostics(newSegment, "flush"); - finishFlushedSegment(newSegment, perThread); - } - } - } + boolean success = false; - if (anythingFlushed) { - clearThreadBindings(); + try { - sequenceIDLock.lock(); - try { - flushedSequenceID = sequenceID; - } finally { - sequenceIDLock.unlock(); - } - numDocsInRAM.set(0); - } - - if (flushDeletes) { - deletesInRAM.clear(); - } - - - return anythingFlushed; + if (closeDocStore) { + assert flushState.docStoreSegmentName != null; + assert flushState.docStoreSegmentName.equals(flushState.segmentName); + closeDocStore(); + flushState.numDocsInStore = 0; } - }); + + Collection threads = new HashSet(); + for(int i=0;i= maxThreadStates)) { + state = minThreadState; + state.numThreads++; + } else { + // Just create a new "private" thread state + DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length]; + if (threadStates.length > 0) + System.arraycopy(threadStates, 0, newArray, 0, threadStates.length); + state = newArray[threadStates.length] = new DocumentsWriterThreadState(this); + threadStates = newArray; + } + threadBindings.put(Thread.currentThread(), state); + } + + // Next, wait until my thread state is idle (in case + // it's shared with other threads) and for threads to + // not be paused nor a flush pending: + waitReady(state); + + // Allocate segment name if this is the first doc since + // last flush: + initSegmentName(false); + + state.isIdle = false; + + boolean success = false; try { - try { - abortedFiles = openFiles(); - } catch (Throwable t) { - abortedFiles = null; + state.docState.docID = nextDocID; + + assert writer.testPoint("DocumentsWriter.ThreadState.init start"); + + if (delTerm != null) { + addDeleteTerm(delTerm, state.docState.docID); + state.doFlushAfter = timeToFlushDeletes(); } - - deletesInRAM.clear(); - // nocommit - // deletesFlushed.clear(); - - openFiles.clear(); + + assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm"); + + nextDocID++; + numDocsInRAM++; + + // We must at this point commit to flushing to ensure we + // always get N docs when we flush by doc count, even if + // > 1 thread is adding documents: + if (!flushPending && + maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH + && numDocsInRAM >= maxBufferedDocs) { + flushPending = true; + state.doFlushAfter = true; + } + + success = true; } finally { - threadPool.finishAbort(); - } - - } - - final List openFiles = new ArrayList(); - private Collection abortedFiles; // List of files that were written before last abort() - - /* - * Returns Collection of files in use by this instance, - * including any flushed segments. - */ - @SuppressWarnings("unchecked") - List openFiles() { - synchronized(openFiles) { - return (List) ((ArrayList) openFiles).clone(); - } - } - - - Collection abortedFiles() { - return abortedFiles; - } - - boolean hasDeletes() { - return deletesInRAM.hasDeletes(); - } - - // nocommit - int getNumDocsInRAM() { - return numDocsInRAM.get(); - } - - // nocommit - long getRAMUsed() { - return ramUsed.get(); - } - - // nocommit - // long getRAMUsed() { - // return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; - // } - - boolean applyDeletes(SegmentInfos infos) throws IOException { - synchronized(indexWriter) { - if (!hasDeletes()) - return false; - - final long t0 = System.currentTimeMillis(); - - if (infoStream != null) { - message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " + - +infos.size() + " segments."); - } - - final int infosEnd = infos.size(); - - boolean any = false; - for (int i = 0; i < infosEnd; i++) { - - // Make sure we never attempt to apply deletes to - // segment in external dir - assert infos.info(i).dir == directory; - - SegmentInfo si = infos.info(i); - SegmentReader reader = indexWriter.readerPool.get(si, false); - try { - any |= applyDeletes(reader, si.getMinSequenceID(), si.getMaxSequenceID(), null); - } finally { - indexWriter.readerPool.release(reader); - } - } - - if (infoStream != null) { - message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec"); - } - - return any; - } - } - - // Apply buffered delete terms, queries and docIDs to the - // provided reader - final boolean applyDeletes(IndexReader reader, long minSequenceID, long maxSequenceID, long[] sequenceIDs) - throws CorruptIndexException, IOException { - - assert sequenceIDs == null || sequenceIDs.length >= reader.maxDoc() : "reader.maxDoc=" - + reader.maxDoc() + ",sequenceIDs.length=" + sequenceIDs.length; - - boolean any = false; - - // first: delete the documents that had non-aborting exceptions - if (sequenceIDs != null) { - for (int i = 0; i < reader.maxDoc(); i++) { - if (sequenceIDs[i] == -1) { - reader.deleteDocument(i); - any = true; + if (!success) { + // Forcefully idle this ThreadState: + state.isIdle = true; + notifyAll(); + if (state.doFlushAfter) { + state.doFlushAfter = false; + flushPending = false; } } } + + return state; + } + + /** Returns true if the caller (IndexWriter) should now + * flush. */ + boolean addDocument(Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + return updateDocument(doc, analyzer, null); + } + + boolean updateDocument(Term t, Document doc, Analyzer analyzer) + throws CorruptIndexException, IOException { + return updateDocument(doc, analyzer, t); + } + + boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm) + throws CorruptIndexException, IOException { - if (deletesInRAM.hasDeletes()) { - IndexSearcher searcher = new IndexSearcher(reader); + // This call is synchronized but fast + final DocumentsWriterThreadState state = getThreadState(doc, delTerm); - SortedMap deletes = deletesInRAM.deletes.getReadCopy(); - - SortedMap deleteTerms = new TreeMap(); - for (Entry entry : deletes.entrySet()) { - if (minSequenceID < entry.getKey()) { - BufferedDeletesInRAM.Delete delete = entry.getValue(); - if (delete instanceof BufferedDeletesInRAM.DeleteTerm) { - BufferedDeletesInRAM.DeleteTerm deleteTerm = (BufferedDeletesInRAM.DeleteTerm) delete; - deleteTerms.put(deleteTerm.term, entry.getKey()); - } else if (delete instanceof BufferedDeletesInRAM.DeleteTerms) { - BufferedDeletesInRAM.DeleteTerms terms = (BufferedDeletesInRAM.DeleteTerms) delete; - for (Term t : terms.terms) { - deleteTerms.put(t, entry.getKey()); - } + final DocState docState = state.docState; + docState.doc = doc; + docState.analyzer = analyzer; + + boolean success = false; + try { + // This call is not synchronized and does all the + // work + final DocWriter perDoc; + try { + perDoc = state.consumer.processDocument(); + } finally { + docState.clear(); + } + + // This call is synchronized but fast + finishDocument(state, perDoc); + + success = true; + } finally { + if (!success) { + synchronized(this) { + + if (aborting) { + state.isIdle = true; + notifyAll(); + abort(); } else { - // delete query - BufferedDeletesInRAM.DeleteQuery deleteQuery = (BufferedDeletesInRAM.DeleteQuery) delete; - Query query = deleteQuery.query; - Weight weight = query.weight(searcher); - Scorer scorer = weight.scorer(reader, true, false); - if (scorer != null) { - while (true) { - int doc = scorer.nextDoc(); - if (doc == DocsEnum.NO_MORE_DOCS) { - break; - } - if ( (sequenceIDs != null && sequenceIDs[doc] < entry.getKey()) - || (sequenceIDs == null && maxSequenceID < entry.getKey())) { - reader.deleteDocument(doc); - any = true; - } + skipDocWriter.docID = docState.docID; + boolean success2 = false; + try { + waitQueue.add(skipDocWriter); + success2 = true; + } finally { + if (!success2) { + state.isIdle = true; + notifyAll(); + abort(); + return false; } } + + state.isIdle = true; + notifyAll(); + + // If this thread state had decided to flush, we + // must clear it so another thread can flush + if (state.doFlushAfter) { + state.doFlushAfter = false; + flushPending = false; + notifyAll(); + } + + // Immediately mark this document as deleted + // since likely it was partially added. This + // keeps indexing as "all or none" (atomic) when + // adding a document: + addDeleteDocID(state.docState.docID); } } } + } - // Delete by term - if (deleteTerms.size() > 0) { - Fields fields = reader.fields(); - if (fields == null) { - // This reader has no postings - return false; - } + return state.doFlushAfter || timeToFlushDeletes(); + } - TermsEnum termsEnum = null; + // for testing + synchronized int getNumBufferedDeleteTerms() { + return deletesInRAM.numTerms; + } - String currentField = null; - BytesRef termRef = new BytesRef(); - DocsEnum docs = null; + // for testing + synchronized Map getBufferedDeleteTerms() { + return deletesInRAM.terms; + } - for (Entry entry : deleteTerms.entrySet()) { - Term term = entry.getKey(); - // Since we visit terms sorted, we gain performance - // by re-using the same TermsEnum and seeking only - // forwards - if (term.field() != currentField) { - assert currentField == null || currentField.compareTo(term.field()) < 0; - currentField = term.field(); - Terms terms = fields.terms(currentField); - if (terms != null) { - termsEnum = terms.iterator(); - } else { - termsEnum = null; - } - } + /** Called whenever a merge has completed and the merged segments had deletions */ + synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { + if (docMaps == null) + // The merged segments had no deletes so docIDs did not change and we have nothing to do + return; + MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); + deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); + flushedDocCount -= mapper.docShift; + } - if (termsEnum == null) { - continue; - } - // assert checkDeleteTerm(term); + synchronized private void waitReady(DocumentsWriterThreadState state) { - termRef.copy(term.text()); - - if (termsEnum.seek(termRef, false) == TermsEnum.SeekStatus.FOUND) { - DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); - - if (docsEnum != null) { - docs = docsEnum; - // int limit = entry.getValue().getNum(); - while (true) { - final int doc = docs.nextDoc(); - // if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) { - if (doc == DocsEnum.NO_MORE_DOCS) { - break; - } - if ( (sequenceIDs != null && sequenceIDs[doc] < entry.getValue()) - || (sequenceIDs == null && maxSequenceID < entry.getValue())) { - reader.deleteDocument(doc); - any = true; - } - } - } - } - } + while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); } } + if (closed) + throw new AlreadyClosedException("this IndexWriter is closed"); + } + + boolean bufferDeleteTerms(Term[] terms) throws IOException { + synchronized(this) { + waitReady(null); + for (int i = 0; i < terms.length; i++) + addDeleteTerm(terms[i], numDocsInRAM); + } + return timeToFlushDeletes(); + } + + boolean bufferDeleteTerm(Term term) throws IOException { + synchronized(this) { + waitReady(null); + addDeleteTerm(term, numDocsInRAM); + } + return timeToFlushDeletes(); + } + + boolean bufferDeleteQueries(Query[] queries) throws IOException { + synchronized(this) { + waitReady(null); + for (int i = 0; i < queries.length; i++) + addDeleteQuery(queries[i], numDocsInRAM); + } + return timeToFlushDeletes(); + } + + boolean bufferDeleteQuery(Query query) throws IOException { + synchronized(this) { + waitReady(null); + addDeleteQuery(query, numDocsInRAM); + } + return timeToFlushDeletes(); + } + + synchronized boolean deletesFull() { + return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && + (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || + (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && + ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); + } + + synchronized boolean doApplyDeletes() { + // Very similar to deletesFull(), except we don't count + // numBytesUsed, because we are checking whether + // deletes (alone) are consuming too many resources now + // and thus should be applied. We apply deletes if RAM + // usage is > 1/2 of our allowed RAM buffer, to prevent + // too-frequent flushing of a long tail of tiny segments + // when merges (which always apply deletes) are + // infrequent. + return (ramBufferSize != IndexWriterConfig.DISABLE_AUTO_FLUSH && + (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize/2) || + (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && + ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms)); + } + + private boolean timeToFlushDeletes() { + balanceRAM(); + synchronized(this) { + return (bufferIsFull || deletesFull()) && setFlushPending(); + } + } + + void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { + this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; + } + + int getMaxBufferedDeleteTerms() { + return maxBufferedDeleteTerms; + } + + synchronized boolean hasDeletes() { + return deletesFlushed.any(); + } + + synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { + + if (!hasDeletes()) + return false; + + final long t0 = System.currentTimeMillis(); + + if (infoStream != null) + message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + + deletesFlushed.docIDs.size() + " deleted docIDs and " + + deletesFlushed.queries.size() + " deleted queries on " + + + infos.size() + " segments."); + + final int infosEnd = infos.size(); + + int docStart = 0; + boolean any = false; + for (int i = 0; i < infosEnd; i++) { + + // Make sure we never attempt to apply deletes to + // segment in external dir + assert infos.info(i).dir == directory; + + SegmentReader reader = writer.readerPool.get(infos.info(i), false); + try { + any |= applyDeletes(reader, docStart); + docStart += reader.maxDoc(); + } finally { + writer.readerPool.release(reader); + } + } + + deletesFlushed.clear(); + if (infoStream != null) { + message("apply deletes took " + (System.currentTimeMillis()-t0) + " msec"); + } + return any; } - void message(String message) { - if (infoStream != null) { - indexWriter.message("DW: " + message); + // used only by assert + private Term lastDeleteTerm; + + // used only by assert + private boolean checkDeleteTerm(Term term) { + if (term != null) { + assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term; + } + lastDeleteTerm = term; + return true; + } + + // Apply buffered delete terms, queries and docIDs to the + // provided reader + private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) + throws CorruptIndexException, IOException { + + final int docEnd = docIDStart + reader.maxDoc(); + boolean any = false; + + assert checkDeleteTerm(null); + + // Delete by term + if (deletesFlushed.terms.size() > 0) { + Fields fields = reader.fields(); + if (fields == null) { + // This reader has no postings + return false; + } + + TermsEnum termsEnum = null; + + String currentField = null; + DocsEnum docs = null; + + for (Entry entry: deletesFlushed.terms.entrySet()) { + Term term = entry.getKey(); + // Since we visit terms sorted, we gain performance + // by re-using the same TermsEnum and seeking only + // forwards + if (term.field() != currentField) { + assert currentField == null || currentField.compareTo(term.field()) < 0; + currentField = term.field(); + Terms terms = fields.terms(currentField); + if (terms != null) { + termsEnum = terms.iterator(); + } else { + termsEnum = null; + } + } + + if (termsEnum == null) { + continue; + } + assert checkDeleteTerm(term); + + if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) { + DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs); + + if (docsEnum != null) { + docs = docsEnum; + int limit = entry.getValue().getNum(); + while (true) { + final int docID = docs.nextDoc(); + if (docID == DocsEnum.NO_MORE_DOCS || docIDStart+docID >= limit) { + break; + } + reader.deleteDocument(docID); + any = true; + } + } + } + } + } + + // Delete by docID + for (Integer docIdInt : deletesFlushed.docIDs) { + int docID = docIdInt.intValue(); + if (docID >= docIDStart && docID < docEnd) { + reader.deleteDocument(docID-docIDStart); + any = true; + } + } + + // Delete by query + if (deletesFlushed.queries.size() > 0) { + IndexSearcher searcher = new IndexSearcher(reader); + try { + for (Entry entry : deletesFlushed.queries.entrySet()) { + Query query = entry.getKey(); + int limit = entry.getValue().intValue(); + Weight weight = query.weight(searcher); + Scorer scorer = weight.scorer(reader, true, false); + if (scorer != null) { + while(true) { + int doc = scorer.nextDoc(); + if (((long) docIDStart) + doc >= limit) + break; + reader.deleteDocument(doc); + any = true; + } + } + } + } finally { + searcher.close(); + } + } + return any; + } + + // Buffer a term in bufferedDeleteTerms, which records the + // current number of documents buffered in ram so that the + // delete term will be applied to those documents as well + // as the disk segments. + synchronized private void addDeleteTerm(Term term, int docCount) { + BufferedDeletes.Num num = deletesInRAM.terms.get(term); + final int docIDUpto = flushedDocCount + docCount; + if (num == null) + deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto)); + else + num.setNum(docIDUpto); + deletesInRAM.numTerms++; + + deletesInRAM.addBytesUsed(BYTES_PER_DEL_TERM + term.bytes.length); + } + + // Buffer a specific docID for deletion. Currently only + // used when we hit a exception when adding a document + synchronized private void addDeleteDocID(int docID) { + deletesInRAM.docIDs.add(Integer.valueOf(flushedDocCount+docID)); + deletesInRAM.addBytesUsed(BYTES_PER_DEL_DOCID); + } + + synchronized private void addDeleteQuery(Query query, int docID) { + deletesInRAM.queries.put(query, Integer.valueOf(flushedDocCount + docID)); + deletesInRAM.addBytesUsed(BYTES_PER_DEL_QUERY); + } + + /** Does the synchronized work to finish/flush the + * inverted document. */ + private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException { + + // Must call this w/o holding synchronized(this) else + // we'll hit deadlock: + balanceRAM(); + + synchronized(this) { + + assert docWriter == null || docWriter.docID == perThread.docState.docID; + + if (aborting) { + + // We are currently aborting, and another thread is + // waiting for me to become idle. We just forcefully + // idle this threadState; it will be fully reset by + // abort() + if (docWriter != null) + try { + docWriter.abort(); + } catch (Throwable t) { + } + + perThread.isIdle = true; + notifyAll(); + return; + } + + final boolean doPause; + + if (docWriter != null) + doPause = waitQueue.add(docWriter); + else { + skipDocWriter.docID = perThread.docState.docID; + doPause = waitQueue.add(skipDocWriter); + } + + if (doPause) + waitForWaitQueue(); + + if (bufferIsFull && !flushPending) { + flushPending = true; + perThread.doFlushAfter = true; + } + + perThread.isIdle = true; + notifyAll(); } } + synchronized void waitForWaitQueue() { + do { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } while (!waitQueue.doResume()); + } + + private static class SkipDocWriter extends DocWriter { + @Override + void finish() { + } + @Override + void abort() { + } + @Override + long sizeInBytes() { + return 0; + } + } + final SkipDocWriter skipDocWriter = new SkipDocWriter(); + + long getRAMUsed() { + return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed; + } + + long numBytesUsed; + + NumberFormat nf = NumberFormat.getInstance(); + + // Coarse estimates used to measure RAM usage of buffered deletes + final static int OBJECT_HEADER_BYTES = 8; + final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4; + final static int INT_NUM_BYTE = 4; + final static int CHAR_NUM_BYTE = 2; + + /* Rough logic: HashMap has an array[Entry] w/ varying + load factor (say 2 * POINTER). Entry is object w/ Term + key, BufferedDeletes.Num val, int hash, Entry next + (OBJ_HEADER + 3*POINTER + INT). Term is object w/ + String field and String text (OBJ_HEADER + 2*POINTER). + We don't count Term's field since it's interned. + Term's text is String (OBJ_HEADER + 4*INT + POINTER + + OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is + OBJ_HEADER + INT. */ + + final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE; + + /* Rough logic: del docIDs are List. Say list + allocates ~2X size (2*POINTER). Integer is OBJ_HEADER + + int */ + final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE; + + /* Rough logic: HashMap has an array[Entry] w/ varying + load factor (say 2 * POINTER). Entry is object w/ + Query key, Integer val, int hash, Entry next + (OBJ_HEADER + 3*POINTER + INT). Query we often + undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */ + final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24; + + /* Initial chunks size of the shared byte[] blocks used to + store postings data */ + final static int BYTE_BLOCK_SHIFT = 15; + final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT; + final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1; + final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; + + /* if you increase this, you must fix field cache impl for + * getTerms/getTermsIndex requires <= 32768 */ + final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2; + + private class ByteBlockAllocator extends ByteBlockPool.Allocator { + final int blockSize; + + ByteBlockAllocator(int blockSize) { + this.blockSize = blockSize; + } + + ArrayList freeByteBlocks = new ArrayList(); + + /* Allocate another byte[] from the shared pool */ + @Override + byte[] getByteBlock() { + synchronized(DocumentsWriter.this) { + final int size = freeByteBlocks.size(); + final byte[] b; + if (0 == size) { + b = new byte[blockSize]; + numBytesUsed += blockSize; + } else + b = freeByteBlocks.remove(size-1); + return b; + } + } + + /* Return byte[]'s to the pool */ + + @Override + void recycleByteBlocks(byte[][] blocks, int start, int end) { + synchronized(DocumentsWriter.this) { + for(int i=start;i blocks) { + synchronized(DocumentsWriter.this) { + final int size = blocks.size(); + for(int i=0;i freeIntBlocks = new ArrayList(); + + /* Allocate another int[] from the shared pool */ + synchronized int[] getIntBlock() { + final int size = freeIntBlocks.size(); + final int[] b; + if (0 == size) { + b = new int[INT_BLOCK_SIZE]; + numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE; + } else + b = freeIntBlocks.remove(size-1); + return b; + } + + synchronized void bytesUsed(long numBytes) { + numBytesUsed += numBytes; + } + + /* Return int[]s to the pool */ + synchronized void recycleIntBlocks(int[][] blocks, int start, int end) { + for(int i=start;i= ramBufferSize; + } + + if (doBalance) { + + if (infoStream != null) + message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) + + " vs trigger=" + toMB(ramBufferSize) + + " deletesMB=" + toMB(deletesRAMUsed) + + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + + " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE)); + + final long startBytesUsed = numBytesUsed + deletesRAMUsed; + + int iter = 0; + + // We free equally from each pool in 32 KB + // chunks until we are below our threshold + // (freeLevel) + + boolean any = true; + + while(numBytesUsed+deletesRAMUsed > freeLevel) { + + synchronized(this) { + if (0 == perDocAllocator.freeByteBlocks.size() && + 0 == byteBlockAllocator.freeByteBlocks.size() && + 0 == freeIntBlocks.size() && !any) { + // Nothing else to free -- must flush now. + bufferIsFull = numBytesUsed+deletesRAMUsed > ramBufferSize; + if (infoStream != null) { + if (numBytesUsed+deletesRAMUsed > ramBufferSize) + message(" nothing to free; now set bufferIsFull"); + else + message(" nothing to free"); + } + break; + } + + if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { + byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); + numBytesUsed -= BYTE_BLOCK_SIZE; + } + + if ((1 == iter % 4) && freeIntBlocks.size() > 0) { + freeIntBlocks.remove(freeIntBlocks.size()-1); + numBytesUsed -= INT_BLOCK_SIZE * INT_NUM_BYTE; + } + + if ((2 == iter % 4) && perDocAllocator.freeByteBlocks.size() > 0) { + // Remove upwards of 32 blocks (each block is 1K) + for (int i = 0; i < 32; ++i) { + perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1); + numBytesUsed -= PER_DOC_BLOCK_SIZE; + if (perDocAllocator.freeByteBlocks.size() == 0) { + break; + } + } + } + } + + if ((3 == iter % 4) && any) + // Ask consumer to free any recycled state + any = consumer.freeRAM(); + + iter++; + } + + if (infoStream != null) + message(" after free: freedMB=" + nf.format((startBytesUsed-numBytesUsed-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((numBytesUsed+deletesRAMUsed)/1024./1024.)); + } + } + + final WaitQueue waitQueue = new WaitQueue(); + + private class WaitQueue { + DocWriter[] waiting; + int nextWriteDocID; + int nextWriteLoc; + int numWaiting; + long waitingBytes; + + public WaitQueue() { + waiting = new DocWriter[10]; + } + + synchronized void reset() { + // NOTE: nextWriteLoc doesn't need to be reset + assert numWaiting == 0; + assert waitingBytes == 0; + nextWriteDocID = 0; + } + + synchronized boolean doResume() { + return waitingBytes <= waitQueueResumeBytes; + } + + synchronized boolean doPause() { + return waitingBytes > waitQueuePauseBytes; + } + + synchronized void abort() { + int count = 0; + for(int i=0;i= nextWriteDocID; + + if (doc.docID == nextWriteDocID) { + writeDocument(doc); + while(true) { + doc = waiting[nextWriteLoc]; + if (doc != null) { + numWaiting--; + waiting[nextWriteLoc] = null; + waitingBytes -= doc.sizeInBytes(); + writeDocument(doc); + } else + break; + } + } else { + + // I finished before documents that were added + // before me. This can easily happen when I am a + // small doc and the docs before me were large, or, + // just due to luck in the thread scheduling. Just + // add myself to the queue and when that large doc + // finishes, it will flush me: + int gap = doc.docID - nextWriteDocID; + if (gap >= waiting.length) { + // Grow queue + DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; + assert nextWriteLoc >= 0; + System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); + System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); + nextWriteLoc = 0; + waiting = newArray; + gap = doc.docID - nextWriteDocID; + } + + int loc = nextWriteLoc + gap; + if (loc >= waiting.length) + loc -= waiting.length; + + // We should only wrap one time + assert loc < waiting.length; + + // Nobody should be in my spot! + assert waiting[loc] == null; + waiting[loc] = doc; + numWaiting++; + waitingBytes += doc.sizeInBytes(); + } + + return doPause(); + } + } } diff --git a/lucene/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/src/java/org/apache/lucene/index/IndexWriter.java index 2f53492e617..563446499e2 100644 --- a/lucene/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/src/java/org/apache/lucene/index/IndexWriter.java @@ -240,7 +240,7 @@ public class IndexWriter implements Closeable { * printed to infoStream, if set (see {@link * #setInfoStream}). */ - public final static int MAX_TERM_LENGTH = DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8; + public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8; // The normal read buffer size defaults to 1024, but // increasing this during merging seems to yield @@ -271,12 +271,10 @@ public class IndexWriter implements Closeable { volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) volatile long pendingCommitChangeCount; - // nocommit - private - SegmentInfos segmentInfos = new SegmentInfos(); // the segments + private SegmentInfos segmentInfos = new SegmentInfos(); // the segments private DocumentsWriter docWriter; - //nocommit - private - IndexFileDeleter deleter; + private IndexFileDeleter deleter; private Set segmentsToOptimize = new HashSet(); // used by optimize to note those needing optimization @@ -291,8 +289,8 @@ public class IndexWriter implements Closeable { // Holds all SegmentInfo instances currently involved in // merges private HashSet mergingSegments = new HashSet(); - // nocommit - private - MergePolicy mergePolicy; + + private MergePolicy mergePolicy; // TODO 4.0: this should be made final once the setter is removed private /*final*/MergeScheduler mergeScheduler; private LinkedList pendingMerges = new LinkedList(); @@ -735,6 +733,113 @@ public class IndexWriter implements Closeable { throw new IllegalArgumentException("this method can only be called when the merge policy is the default LogMergePolicy"); } + /**

Get the current setting of whether newly flushed + * segments will use the compound file format. Note that + * this just returns the value previously set with + * setUseCompoundFile(boolean), or the default value + * (true). You cannot use this to query the status of + * previously flushed segments.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getUseCompoundFile as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * + * @see #setUseCompoundFile(boolean) + * @deprecated use {@link LogMergePolicy#getUseCompoundDocStore()} and + * {@link LogMergePolicy#getUseCompoundFile()} directly. + */ + public boolean getUseCompoundFile() { + return getLogMergePolicy().getUseCompoundFile(); + } + + /** + *

+ * Setting to turn on usage of a compound file. When on, multiple files for + * each segment are merged into a single file when a new segment is flushed. + *

+ * + *

+ * Note that this method is a convenience method: it just calls + * mergePolicy.setUseCompoundFile as long as mergePolicy is an instance of + * {@link LogMergePolicy}. Otherwise an IllegalArgumentException is thrown. + *

+ * + * @deprecated use {@link LogMergePolicy#setUseCompoundDocStore(boolean)} and + * {@link LogMergePolicy#setUseCompoundFile(boolean)} directly. + * Note that this method set the given value on both, therefore + * you should consider doing the same. + */ + public void setUseCompoundFile(boolean value) { + getLogMergePolicy().setUseCompoundFile(value); + getLogMergePolicy().setUseCompoundDocStore(value); + } + + /** Expert: Set the Similarity implementation used by this IndexWriter. + * + * @see Similarity#setDefault(Similarity) + * @deprecated use {@link IndexWriterConfig#setSimilarity(Similarity)} instead + */ + public void setSimilarity(Similarity similarity) { + ensureOpen(); + this.similarity = similarity; + docWriter.setSimilarity(similarity); + // Required so config.getSimilarity returns the right value. But this will + // go away together with the method in 4.0. + config.setSimilarity(similarity); + } + + /** Expert: Return the Similarity implementation used by this IndexWriter. + * + *

This defaults to the current value of {@link Similarity#getDefault()}. + * @deprecated use {@link IndexWriterConfig#getSimilarity()} instead + */ + public Similarity getSimilarity() { + ensureOpen(); + return similarity; + } + + /** Expert: Set the interval between indexed terms. Large values cause less + * memory to be used by IndexReader, but slow random-access to terms. Small + * values cause more memory to be used by an IndexReader, and speed + * random-access to terms. + * + * This parameter determines the amount of computation required per query + * term, regardless of the number of documents that contain that term. In + * particular, it is the maximum number of other terms that must be + * scanned before a term is located and its frequency and position information + * may be processed. In a large index with user-entered query terms, query + * processing time is likely to be dominated not by term lookup but rather + * by the processing of frequency and positional data. In a small index + * or when many uncommon query terms are generated (e.g., by wildcard + * queries) term lookup may become a dominant cost. + * + * In particular, numUniqueTerms/interval terms are read into + * memory by an IndexReader, and, on average, interval/2 terms + * must be scanned for each random term access. + * + * @see #DEFAULT_TERM_INDEX_INTERVAL + * @deprecated use {@link IndexWriterConfig#setTermIndexInterval(int)} + */ + public void setTermIndexInterval(int interval) { + ensureOpen(); + this.termIndexInterval = interval; + // Required so config.getTermIndexInterval returns the right value. But this + // will go away together with the method in 4.0. + config.setTermIndexInterval(interval); + } + + /** Expert: Return the interval between indexed terms. + * + * @see #setTermIndexInterval(int) + * @deprecated use {@link IndexWriterConfig#getTermIndexInterval()} + */ + public int getTermIndexInterval() { + // We pass false because this method is called by SegmentMerger while we are in the process of closing + ensureOpen(false); + return termIndexInterval; + } + /** * Constructs an IndexWriter for the index in d. * Text will be analyzed with a. If create @@ -923,6 +1028,7 @@ public class IndexWriter implements Closeable { directory = d; analyzer = conf.getAnalyzer(); setMessageID(defaultInfoStream); + maxFieldLength = conf.getMaxFieldLength(); termIndexInterval = conf.getTermIndexInterval(); writeLockTimeout = conf.getWriteLockTimeout(); similarity = conf.getSimilarity(); @@ -996,10 +1102,9 @@ public class IndexWriter implements Closeable { setRollbackSegmentInfos(segmentInfos); - docWriter = new DocumentsWriter(directory, this, conf); - // nocommit - //docWriter.setInfoStream(infoStream); - //docWriter.setMaxFieldLength(maxFieldLength); + docWriter = new DocumentsWriter(directory, this, conf.getIndexingChain(), conf.getMaxThreadStates()); + docWriter.setInfoStream(infoStream); + docWriter.setMaxFieldLength(maxFieldLength); // Default deleter (for backwards compatibility) is // KeepOnlyLastCommitDeleter: @@ -1062,6 +1167,149 @@ public class IndexWriter implements Closeable { return config; } + /** + * Expert: set the merge policy used by this writer. + * + * @deprecated use {@link IndexWriterConfig#setMergePolicy(MergePolicy)} instead. + */ + public void setMergePolicy(MergePolicy mp) { + ensureOpen(); + if (mp == null) + throw new NullPointerException("MergePolicy must be non-null"); + + if (mergePolicy != mp) + mergePolicy.close(); + mergePolicy = mp; + mergePolicy.setIndexWriter(this); + pushMaxBufferedDocs(); + if (infoStream != null) + message("setMergePolicy " + mp); + // Required so config.getMergePolicy returns the right value. But this will + // go away together with the method in 4.0. + config.setMergePolicy(mp); + } + + /** + * Expert: returns the current MergePolicy in use by this writer. + * @see #setMergePolicy + * + * @deprecated use {@link IndexWriterConfig#getMergePolicy()} instead + */ + public MergePolicy getMergePolicy() { + ensureOpen(); + return mergePolicy; + } + + /** + * Expert: set the merge scheduler used by this writer. + * @deprecated use {@link IndexWriterConfig#setMergeScheduler(MergeScheduler)} instead + */ + synchronized public void setMergeScheduler(MergeScheduler mergeScheduler) throws CorruptIndexException, IOException { + ensureOpen(); + if (mergeScheduler == null) + throw new NullPointerException("MergeScheduler must be non-null"); + + if (this.mergeScheduler != mergeScheduler) { + finishMerges(true); + this.mergeScheduler.close(); + } + this.mergeScheduler = mergeScheduler; + if (infoStream != null) + message("setMergeScheduler " + mergeScheduler); + // Required so config.getMergeScheduler returns the right value. But this will + // go away together with the method in 4.0. + config.setMergeScheduler(mergeScheduler); + } + + /** + * Expert: returns the current MergeScheduler in use by this + * writer. + * @see #setMergeScheduler(MergeScheduler) + * @deprecated use {@link IndexWriterConfig#getMergeScheduler()} instead + */ + public MergeScheduler getMergeScheduler() { + ensureOpen(); + return mergeScheduler; + } + + /**

Determines the largest segment (measured by + * document count) that may be merged with other segments. + * Small values (e.g., less than 10,000) are best for + * interactive indexing, as this limits the length of + * pauses while indexing to a few seconds. Larger values + * are best for batched indexing and speedier + * searches.

+ * + *

The default value is {@link Integer#MAX_VALUE}.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.setMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * + *

The default merge policy ({@link + * LogByteSizeMergePolicy}) also allows you to set this + * limit by net size (in MB) of the segment, using {@link + * LogByteSizeMergePolicy#setMaxMergeMB}.

+ * @deprecated use {@link LogMergePolicy#setMaxMergeDocs(int)} directly. + */ + public void setMaxMergeDocs(int maxMergeDocs) { + getLogMergePolicy().setMaxMergeDocs(maxMergeDocs); + } + + /** + *

Returns the largest segment (measured by document + * count) that may be merged with other segments.

+ * + *

Note that this method is a convenience method: it + * just calls mergePolicy.getMaxMergeDocs as long as + * mergePolicy is an instance of {@link LogMergePolicy}. + * Otherwise an IllegalArgumentException is thrown.

+ * + * @see #setMaxMergeDocs + * @deprecated use {@link LogMergePolicy#getMaxMergeDocs()} directly. + */ + public int getMaxMergeDocs() { + return getLogMergePolicy().getMaxMergeDocs(); + } + + /** + * The maximum number of terms that will be indexed for a single field in a + * document. This limits the amount of memory required for indexing, so that + * collections with very large files will not crash the indexing process by + * running out of memory. This setting refers to the number of running terms, + * not to the number of different terms.

+ * Note: this silently truncates large documents, excluding from the + * index all terms that occur further in the document. If you know your source + * documents are large, be sure to set this value high enough to accomodate + * the expected size. If you set it to Integer.MAX_VALUE, then the only limit + * is your memory, but you should anticipate an OutOfMemoryError.

+ * By default, no more than {@link #DEFAULT_MAX_FIELD_LENGTH} terms + * will be indexed for a field. + * @deprecated use {@link IndexWriterConfig#setMaxFieldLength(int)} instead + */ + public void setMaxFieldLength(int maxFieldLength) { + ensureOpen(); + this.maxFieldLength = maxFieldLength; + docWriter.setMaxFieldLength(maxFieldLength); + if (infoStream != null) + message("setMaxFieldLength " + maxFieldLength); + // Required so config.getMaxFieldLength returns the right value. But this + // will go away together with the method in 4.0. + config.setMaxFieldLength(maxFieldLength); + } + + /** + * Returns the maximum number of terms that will be + * indexed for a single field in a document. + * @see #setMaxFieldLength + * @deprecated use {@link IndexWriterConfig#getMaxFieldLength()} instead + */ + public int getMaxFieldLength() { + ensureOpen(); + return maxFieldLength; + } + /** Determines the minimal number of documents required * before the buffered in-memory documents are flushed as * a new Segment. Large values generally gives faster @@ -1295,8 +1543,7 @@ public class IndexWriter implements Closeable { public void setInfoStream(PrintStream infoStream) { ensureOpen(); setMessageID(infoStream); - // nocommit - //docWriter.setInfoStream(infoStream); + docWriter.setInfoStream(infoStream); deleter.setInfoStream(infoStream); if (infoStream != null) messageState(); @@ -1324,6 +1571,48 @@ public class IndexWriter implements Closeable { return infoStream != null; } + /** + * Sets the maximum time to wait for a write lock (in milliseconds) for this instance of IndexWriter. @see + * @see #setDefaultWriteLockTimeout to change the default value for all instances of IndexWriter. + * @deprecated use {@link IndexWriterConfig#setWriteLockTimeout(long)} instead + */ + public void setWriteLockTimeout(long writeLockTimeout) { + ensureOpen(); + this.writeLockTimeout = writeLockTimeout; + // Required so config.getWriteLockTimeout returns the right value. But this + // will go away together with the method in 4.0. + config.setWriteLockTimeout(writeLockTimeout); + } + + /** + * Returns allowed timeout when acquiring the write lock. + * @see #setWriteLockTimeout + * @deprecated use {@link IndexWriterConfig#getWriteLockTimeout()} + */ + public long getWriteLockTimeout() { + ensureOpen(); + return writeLockTimeout; + } + + /** + * Sets the default (for any instance of IndexWriter) maximum time to wait for a write lock (in + * milliseconds). + * @deprecated use {@link IndexWriterConfig#setDefaultWriteLockTimeout(long)} instead + */ + public static void setDefaultWriteLockTimeout(long writeLockTimeout) { + IndexWriterConfig.setDefaultWriteLockTimeout(writeLockTimeout); + } + + /** + * Returns default write lock timeout for newly + * instantiated IndexWriters. + * @see #setDefaultWriteLockTimeout + * @deprecated use {@link IndexWriterConfig#getDefaultWriteLockTimeout()} instead + */ + public static long getDefaultWriteLockTimeout() { + return IndexWriterConfig.getDefaultWriteLockTimeout(); + } + /** * Commits all changes to an index and closes all * associated files. Note that this may be a costly @@ -1485,9 +1774,8 @@ public class IndexWriter implements Closeable { closing = false; notifyAll(); if (!closed) { - if (docWriter != null) { + if (docWriter != null) docWriter.resumeAllThreads(); - } if (infoStream != null) message("hit exception while closing"); } @@ -1495,6 +1783,76 @@ public class IndexWriter implements Closeable { } } + /** Tells the docWriter to close its currently open shared + * doc stores (stored fields & vectors files). + * Return value specifices whether new doc store files are compound or not. + */ + private synchronized boolean flushDocStores() throws IOException { + + boolean useCompoundDocStore = false; + + String docStoreSegment; + + boolean success = false; + try { + docStoreSegment = docWriter.closeDocStore(); + success = true; + } finally { + if (!success && infoStream != null) { + message("hit exception closing doc store segment"); + } + } + + useCompoundDocStore = mergePolicy.useCompoundDocStore(segmentInfos); + + if (useCompoundDocStore && docStoreSegment != null && docWriter.closedFiles().size() != 0) { + // Now build compound doc store file + + if (infoStream != null) { + message("create compound file " + IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION)); + } + + success = false; + + final int numSegments = segmentInfos.size(); + final String compoundFileName = IndexFileNames.segmentFileName(docStoreSegment, "", IndexFileNames.COMPOUND_FILE_STORE_EXTENSION); + + try { + CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName); + for (final String file : docWriter.closedFiles() ) { + cfsWriter.addFile(file); + } + + // Perform the merge + cfsWriter.close(); + success = true; + + } finally { + if (!success) { + if (infoStream != null) + message("hit exception building compound file doc store for segment " + docStoreSegment); + deleter.deleteFile(compoundFileName); + docWriter.abort(); + } + } + + for(int i=0;i + * Note that this effectively truncates large documents, excluding from the + * index terms that occur further in the document. If you know your source + * documents are large, be sure to set this value high enough to accommodate + * the expected size. If you set it to Integer.MAX_VALUE, then the only limit + * is your memory, but you should anticipate an OutOfMemoryError.

+ * By default, no more than 10,000 terms will be indexed for a field. + * + * @see MaxFieldLength + */ + private int maxFieldLength; + /** * Adds a document to this index. If the document contains more than * {@link #setMaxFieldLength(int)} terms for a given field, the remainder are @@ -1598,8 +1972,8 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long addDocument(Document doc) throws CorruptIndexException, IOException { - return addDocument(doc, analyzer); + public void addDocument(Document doc) throws CorruptIndexException, IOException { + addDocument(doc, analyzer); } /** @@ -1619,36 +1993,36 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { + public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); + boolean doFlush = false; boolean success = false; try { try { - long sequenceID = docWriter.addDocument(doc, analyzer); + doFlush = docWriter.addDocument(doc, analyzer); success = true; - return sequenceID; } finally { if (!success) { - if (infoStream != null) { + + if (infoStream != null) message("hit exception adding document"); - } + synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here if (docWriter != null) { final Collection files = docWriter.abortedFiles(); - if (files != null) { + if (files != null) deleter.deleteNewFiles(files); - } } } } } + if (doFlush) + flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "addDocument"); } - - return -1; } /** @@ -1662,14 +2036,15 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long deleteDocuments(Term term) throws CorruptIndexException, IOException { + public void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); try { - return docWriter.bufferDeleteTerm(term); + boolean doFlush = docWriter.bufferDeleteTerm(term); + if (doFlush) + flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } - return -1; } /** @@ -1685,14 +2060,15 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long deleteDocuments(Term... terms) throws CorruptIndexException, IOException { + public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException { ensureOpen(); try { - return docWriter.bufferDeleteTerms(terms); + boolean doFlush = docWriter.bufferDeleteTerms(terms); + if (doFlush) + flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } - return -1; } /** @@ -1706,9 +2082,11 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long deleteDocuments(Query query) throws CorruptIndexException, IOException { + public void deleteDocuments(Query query) throws CorruptIndexException, IOException { ensureOpen(); - return docWriter.bufferDeleteQuery(query); + boolean doFlush = docWriter.bufferDeleteQuery(query); + if (doFlush) + flush(true, false, false); } /** @@ -1724,9 +2102,11 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long deleteDocuments(Query... queries) throws CorruptIndexException, IOException { + public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException { ensureOpen(); - return docWriter.bufferDeleteQueries(queries); + boolean doFlush = docWriter.bufferDeleteQueries(queries); + if (doFlush) + flush(true, false, false); } /** @@ -1769,37 +2149,35 @@ public class IndexWriter implements Closeable { * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public long updateDocument(Term term, Document doc, Analyzer analyzer) + public void updateDocument(Term term, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { ensureOpen(); try { + boolean doFlush = false; boolean success = false; try { - long sequenceID = docWriter.updateDocument(term, doc, analyzer); + doFlush = docWriter.updateDocument(term, doc, analyzer); success = true; - return sequenceID; } finally { if (!success) { - if (infoStream != null) { + if (infoStream != null) message("hit exception updating document"); - } synchronized (this) { // If docWriter has some aborted files that were // never incref'd, then we clean them up here final Collection files = docWriter.abortedFiles(); - if (files != null) { + if (files != null) deleter.deleteNewFiles(files); - } } } } + if (doFlush) + flush(true, false, false); } catch (OutOfMemoryError oom) { handleOOM(oom, "updateDocument"); } - - return -1; } // for test purpose @@ -2319,8 +2697,7 @@ public class IndexWriter implements Closeable { // Remove any buffered docs docWriter.abort(); - // nocommit - //docWriter.setFlushedDocCount(0); + docWriter.setFlushedDocCount(0); // Remove all segments segmentInfos.clear(); @@ -2413,8 +2790,7 @@ public class IndexWriter implements Closeable { * the index files referenced exist (correctly) in the * index directory. */ - // nocommit - private - synchronized void checkpoint() throws IOException { + private synchronized void checkpoint() throws IOException { changeCount++; deleter.checkpoint(segmentInfos, false); } @@ -2549,8 +2925,7 @@ public class IndexWriter implements Closeable { ensureOpen(); segmentInfos.addAll(infos); // Notify DocumentsWriter that the flushed count just increased - // nocommit - //docWriter.updateFlushedDocCount(docCount); + docWriter.updateFlushedDocCount(docCount); checkpoint(); } @@ -2602,12 +2977,11 @@ public class IndexWriter implements Closeable { checkpoint(); // Notify DocumentsWriter that the flushed count just increased - // nocommit - //docWriter.updateFlushedDocCount(docCount); + docWriter.updateFlushedDocCount(docCount); } // Now create the compound file if needed - if (mergePolicy instanceof LogMergePolicy && getLogMergePolicy().getUseCompoundFile()) { + if (mergePolicy instanceof LogMergePolicy && getUseCompoundFile()) { List files = null; @@ -2837,17 +3211,183 @@ public class IndexWriter implements Closeable { // synchronized, ie, merges should be allowed to commit // even while a flush is happening private synchronized final boolean doFlush(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { - return docWriter.flushAllThreads(flushDocStores, flushDeletes); - // nocommit -// try { -// try { -// return doFlushInternal(flushDocStores, flushDeletes); -// } finally { -// docWriter.balanceRAM(); -// } -// } finally { -// docWriter.clearFlushPending(); -// } + try { + try { + return doFlushInternal(flushDocStores, flushDeletes); + } finally { + docWriter.balanceRAM(); + } + } finally { + docWriter.clearFlushPending(); + } + } + + // TODO: this method should not have to be entirely + // synchronized, ie, merges should be allowed to commit + // even while a flush is happening + private synchronized final boolean doFlushInternal(boolean flushDocStores, boolean flushDeletes) throws CorruptIndexException, IOException { + + if (hitOOM) { + throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush"); + } + + ensureOpen(false); + + assert testPoint("startDoFlush"); + + doBeforeFlush(); + + flushCount++; + + // If we are flushing because too many deletes + // accumulated, then we should apply the deletes to free + // RAM: + flushDeletes |= docWriter.doApplyDeletes(); + + // Make sure no threads are actively adding a document. + // Returns true if docWriter is currently aborting, in + // which case we skip flushing this segment + if (infoStream != null) { + message("flush: now pause all indexing threads"); + } + if (docWriter.pauseAllThreads()) { + docWriter.resumeAllThreads(); + return false; + } + + try { + + SegmentInfo newSegment = null; + + final int numDocs = docWriter.getNumDocsInRAM(); + + // Always flush docs if there are any + boolean flushDocs = numDocs > 0; + + String docStoreSegment = docWriter.getDocStoreSegment(); + + assert docStoreSegment != null || numDocs == 0: "dss=" + docStoreSegment + " numDocs=" + numDocs; + + if (docStoreSegment == null) + flushDocStores = false; + + int docStoreOffset = docWriter.getDocStoreOffset(); + + boolean docStoreIsCompoundFile = false; + + if (infoStream != null) { + message(" flush: segment=" + docWriter.getSegment() + + " docStoreSegment=" + docWriter.getDocStoreSegment() + + " docStoreOffset=" + docStoreOffset + + " flushDocs=" + flushDocs + + " flushDeletes=" + flushDeletes + + " flushDocStores=" + flushDocStores + + " numDocs=" + numDocs + + " numBufDelTerms=" + docWriter.getNumBufferedDeleteTerms()); + message(" index before flush " + segString()); + } + + // Check if the doc stores must be separately flushed + // because other segments, besides the one we are about + // to flush, reference it + if (flushDocStores && (!flushDocs || !docWriter.getSegment().equals(docWriter.getDocStoreSegment()))) { + // We must separately flush the doc store + if (infoStream != null) + message(" flush shared docStore segment " + docStoreSegment); + + docStoreIsCompoundFile = flushDocStores(); + flushDocStores = false; + } + + String segment = docWriter.getSegment(); + + // If we are flushing docs, segment must not be null: + assert segment != null || !flushDocs; + + if (flushDocs) { + + boolean success = false; + final int flushedDocCount; + + try { + flushedDocCount = docWriter.flush(flushDocStores); + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception flushing segment " + segment); + deleter.refresh(segment); + } + } + + if (0 == docStoreOffset && flushDocStores) { + // This means we are flushing private doc stores + // with this segment, so it will not be shared + // with other segments + assert docStoreSegment != null; + assert docStoreSegment.equals(segment); + docStoreOffset = -1; + docStoreIsCompoundFile = false; + docStoreSegment = null; + } + + // Create new SegmentInfo, but do not add to our + // segmentInfos until deletes are flushed + // successfully. + newSegment = new SegmentInfo(segment, + flushedDocCount, + directory, false, docStoreOffset, + docStoreSegment, docStoreIsCompoundFile, + docWriter.hasProx(), + docWriter.getCodec()); + + setDiagnostics(newSegment, "flush"); + } + + docWriter.pushDeletes(); + + if (flushDocs) { + segmentInfos.add(newSegment); + checkpoint(); + } + + if (flushDocs && mergePolicy.useCompoundFile(segmentInfos, newSegment)) { + // Now build compound file + boolean success = false; + try { + docWriter.createCompoundFile(segment); + success = true; + } finally { + if (!success) { + if (infoStream != null) + message("hit exception creating compound file for newly flushed segment " + segment); + deleter.deleteFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION)); + } + } + + newSegment.setUseCompoundFile(true); + checkpoint(); + } + + if (flushDeletes) { + applyDeletes(); + } + + if (flushDocs) + checkpoint(); + + doAfterFlush(); + + return flushDocs; + + } catch (OutOfMemoryError oom) { + handleOOM(oom, "doFlush"); + // never hit + return false; + } finally { + docWriter.clearFlushPending(); + docWriter.resumeAllThreads(); + } } /** Expert: Return the total size of all index files currently cached in memory. @@ -2995,8 +3535,7 @@ public class IndexWriter implements Closeable { final int start = ensureContiguousMerge(merge); commitMergedDeletes(merge, mergedReader); - // nocommit - //docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); + docWriter.remapDeletes(segmentInfos, merger.getDocMaps(), merger.getDelCounts(), merge, mergedDocCount); setMergeDocStoreIsCompoundFile(merge); merge.info.setHasProx(merger.hasProx()); @@ -3210,8 +3749,7 @@ public class IndexWriter implements Closeable { boolean mergeDocStores = false; boolean doFlushDocStore = false; - // nocommit - //final String currentDocStoreSegment = docWriter.getDocStoreSegment(); + final String currentDocStoreSegment = docWriter.getDocStoreSegment(); // Test each segment to be merged: check if we need to // flush/merge doc stores @@ -3255,10 +3793,9 @@ public class IndexWriter implements Closeable { // If the segment is referencing the current "live" // doc store outputs then we must merge - // nocommit -// if (si.getDocStoreOffset() != -1 && currentDocStoreSegment != null && si.getDocStoreSegment().equals(currentDocStoreSegment)) { -// doFlushDocStore = true; -// } + if (si.getDocStoreOffset() != -1 && currentDocStoreSegment != null && si.getDocStoreSegment().equals(currentDocStoreSegment)) { + doFlushDocStore = true; + } } final int docStoreOffset; @@ -3317,13 +3854,12 @@ public class IndexWriter implements Closeable { // CFS: mergingSegments.add(merge.info); } - - // nocommit - private - static void setDiagnostics(SegmentInfo info, String source) { + + private void setDiagnostics(SegmentInfo info, String source) { setDiagnostics(info, source, null); } - - private static void setDiagnostics(SegmentInfo info, String source, Map details) { + + private void setDiagnostics(SegmentInfo info, String source, Map details) { Map diagnostics = new HashMap(); diagnostics.put("source", source); diagnostics.put("lucene.version", Constants.LUCENE_VERSION); @@ -3467,12 +4003,11 @@ public class IndexWriter implements Closeable { // readers will attempt to open an IndexInput // on files that have still-open IndexOutputs // against them: - // nocommit -// if (dss.contains(docWriter.getDocStoreSegment())) { -// if (infoStream != null) -// message("now flush at mergeMiddle"); -// doFlush(true, false); -// } + if (dss.contains(docWriter.getDocStoreSegment())) { + if (infoStream != null) + message("now flush at mergeMiddle"); + doFlush(true, false); + } } for(int i=0;i